331e6199df
Summary: When users fail to open a DB with file lock failure, it is sometimes hard for users to debug. We now include the time the lock is acquired and the thread ID that acquired the lock, to help users debug problems like this. Default Env's thread ID is used. Since type of lockedFiles is changed, rename it to follow naming convention too. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6507 Test Plan: Add a unit test and improve an existing test to validate the case. Differential Revision: D20378333 fbshipit-source-id: 312fe0e9733fd1d1e9969c321b90ce523cf4708a
2986 lines
99 KiB
C++
2986 lines
99 KiB
C++
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
|
// This source code is licensed under both the GPLv2 (found in the
|
|
// COPYING file in the root directory) and Apache 2.0 License
|
|
// (found in the LICENSE.Apache file in the root directory).
|
|
//
|
|
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
|
#include "db/db_test_util.h"
|
|
#include "port/stack_trace.h"
|
|
#include "rocksdb/perf_context.h"
|
|
#include "rocksdb/utilities/debug.h"
|
|
#include "table/block_based/block_based_table_reader.h"
|
|
#include "table/block_based/block_builder.h"
|
|
#include "test_util/fault_injection_test_env.h"
|
|
#if !defined(ROCKSDB_LITE)
|
|
#include "test_util/sync_point.h"
|
|
#endif
|
|
|
|
namespace ROCKSDB_NAMESPACE {
|
|
|
|
class DBBasicTest : public DBTestBase {
|
|
public:
|
|
DBBasicTest() : DBTestBase("/db_basic_test") {}
|
|
};
|
|
|
|
TEST_F(DBBasicTest, OpenWhenOpen) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
ROCKSDB_NAMESPACE::DB* db2 = nullptr;
|
|
ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
|
|
|
|
ASSERT_EQ(Status::Code::kIOError, s.code());
|
|
ASSERT_EQ(Status::SubCode::kNone, s.subcode());
|
|
ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
|
|
|
|
delete db2;
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBBasicTest, ReadOnlyDB) {
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
ASSERT_OK(Put("bar", "v2"));
|
|
ASSERT_OK(Put("foo", "v3"));
|
|
Close();
|
|
|
|
auto options = CurrentOptions();
|
|
assert(options.env == env_);
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
ASSERT_EQ("v3", Get("foo"));
|
|
ASSERT_EQ("v2", Get("bar"));
|
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
|
int count = 0;
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
ASSERT_OK(iter->status());
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 2);
|
|
delete iter;
|
|
Close();
|
|
|
|
// Reopen and flush memtable.
|
|
Reopen(options);
|
|
Flush();
|
|
Close();
|
|
// Now check keys in read only mode.
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
ASSERT_EQ("v3", Get("foo"));
|
|
ASSERT_EQ("v2", Get("bar"));
|
|
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
|
|
ASSERT_OK(Put("foo", "v1"));
|
|
ASSERT_OK(Put("bar", "v2"));
|
|
ASSERT_OK(Put("foo", "v3"));
|
|
Close();
|
|
|
|
auto options = CurrentOptions();
|
|
options.write_dbid_to_manifest = true;
|
|
assert(options.env == env_);
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
std::string db_id1;
|
|
db_->GetDbIdentity(db_id1);
|
|
ASSERT_EQ("v3", Get("foo"));
|
|
ASSERT_EQ("v2", Get("bar"));
|
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
|
int count = 0;
|
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
|
ASSERT_OK(iter->status());
|
|
++count;
|
|
}
|
|
ASSERT_EQ(count, 2);
|
|
delete iter;
|
|
Close();
|
|
|
|
// Reopen and flush memtable.
|
|
Reopen(options);
|
|
Flush();
|
|
Close();
|
|
// Now check keys in read only mode.
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
ASSERT_EQ("v3", Get("foo"));
|
|
ASSERT_EQ("v2", Get("bar"));
|
|
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
|
|
std::string db_id2;
|
|
db_->GetDbIdentity(db_id2);
|
|
ASSERT_EQ(db_id1, db_id2);
|
|
}
|
|
|
|
TEST_F(DBBasicTest, CompactedDB) {
|
|
const uint64_t kFileSize = 1 << 20;
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.write_buffer_size = kFileSize;
|
|
options.target_file_size_base = kFileSize;
|
|
options.max_bytes_for_level_base = 1 << 30;
|
|
options.compression = kNoCompression;
|
|
Reopen(options);
|
|
// 1 L0 file, use CompactedDB if max_open_files = -1
|
|
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, '1')));
|
|
Flush();
|
|
Close();
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
Status s = Put("new", "value");
|
|
ASSERT_EQ(s.ToString(),
|
|
"Not implemented: Not supported operation in read only mode.");
|
|
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
|
|
Close();
|
|
options.max_open_files = -1;
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
s = Put("new", "value");
|
|
ASSERT_EQ(s.ToString(),
|
|
"Not implemented: Not supported in compacted db mode.");
|
|
ASSERT_EQ(DummyString(kFileSize / 2, '1'), Get("aaa"));
|
|
Close();
|
|
Reopen(options);
|
|
// Add more L0 files
|
|
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, '2')));
|
|
Flush();
|
|
ASSERT_OK(Put("aaa", DummyString(kFileSize / 2, 'a')));
|
|
Flush();
|
|
ASSERT_OK(Put("bbb", DummyString(kFileSize / 2, 'b')));
|
|
ASSERT_OK(Put("eee", DummyString(kFileSize / 2, 'e')));
|
|
Flush();
|
|
Close();
|
|
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
// Fallback to read-only DB
|
|
s = Put("new", "value");
|
|
ASSERT_EQ(s.ToString(),
|
|
"Not implemented: Not supported operation in read only mode.");
|
|
Close();
|
|
|
|
// Full compaction
|
|
Reopen(options);
|
|
// Add more keys
|
|
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
|
|
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
|
|
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
|
|
ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
|
|
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
|
|
ASSERT_EQ(3, NumTableFilesAtLevel(1));
|
|
Close();
|
|
|
|
// CompactedDB
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
s = Put("new", "value");
|
|
ASSERT_EQ(s.ToString(),
|
|
"Not implemented: Not supported in compacted db mode.");
|
|
ASSERT_EQ("NOT_FOUND", Get("abc"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), Get("aaa"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'b'), Get("bbb"));
|
|
ASSERT_EQ("NOT_FOUND", Get("ccc"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), Get("eee"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'f'), Get("fff"));
|
|
ASSERT_EQ("NOT_FOUND", Get("ggg"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'h'), Get("hhh"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), Get("iii"));
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'j'), Get("jjj"));
|
|
ASSERT_EQ("NOT_FOUND", Get("kkk"));
|
|
|
|
// MultiGet
|
|
std::vector<std::string> values;
|
|
std::vector<Status> status_list = dbfull()->MultiGet(
|
|
ReadOptions(),
|
|
std::vector<Slice>({Slice("aaa"), Slice("ccc"), Slice("eee"),
|
|
Slice("ggg"), Slice("iii"), Slice("kkk")}),
|
|
&values);
|
|
ASSERT_EQ(status_list.size(), static_cast<uint64_t>(6));
|
|
ASSERT_EQ(values.size(), static_cast<uint64_t>(6));
|
|
ASSERT_OK(status_list[0]);
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'a'), values[0]);
|
|
ASSERT_TRUE(status_list[1].IsNotFound());
|
|
ASSERT_OK(status_list[2]);
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'e'), values[2]);
|
|
ASSERT_TRUE(status_list[3].IsNotFound());
|
|
ASSERT_OK(status_list[4]);
|
|
ASSERT_EQ(DummyString(kFileSize / 2, 'i'), values[4]);
|
|
ASSERT_TRUE(status_list[5].IsNotFound());
|
|
|
|
Reopen(options);
|
|
// Add a key
|
|
ASSERT_OK(Put("fff", DummyString(kFileSize / 2, 'f')));
|
|
Close();
|
|
ASSERT_OK(ReadOnlyReopen(options));
|
|
s = Put("new", "value");
|
|
ASSERT_EQ(s.ToString(),
|
|
"Not implemented: Not supported operation in read only mode.");
|
|
}
|
|
|
|
TEST_F(DBBasicTest, LevelLimitReopen) {
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
const std::string value(1024 * 1024, ' ');
|
|
int i = 0;
|
|
while (NumTableFilesAtLevel(2, 1) == 0) {
|
|
ASSERT_OK(Put(1, Key(i++), value));
|
|
dbfull()->TEST_WaitForFlushMemTable();
|
|
dbfull()->TEST_WaitForCompact();
|
|
}
|
|
|
|
options.num_levels = 1;
|
|
options.max_bytes_for_level_multiplier_additional.resize(1, 1);
|
|
Status s = TryReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_EQ(s.IsInvalidArgument(), true);
|
|
ASSERT_EQ(s.ToString(),
|
|
"Invalid argument: db has more levels than options.num_levels");
|
|
|
|
options.num_levels = 10;
|
|
options.max_bytes_for_level_multiplier_additional.resize(10, 1);
|
|
ASSERT_OK(TryReopenWithColumnFamilies({"default", "pikachu"}, options));
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_F(DBBasicTest, PutDeleteGet) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_OK(Put(1, "foo", "v2"));
|
|
ASSERT_EQ("v2", Get(1, "foo"));
|
|
ASSERT_OK(Delete(1, "foo"));
|
|
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
|
|
} while (ChangeOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, PutSingleDeleteGet) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_OK(Put(1, "foo2", "v2"));
|
|
ASSERT_EQ("v2", Get(1, "foo2"));
|
|
ASSERT_OK(SingleDelete(1, "foo"));
|
|
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
|
|
// Ski FIFO and universal compaction because they do not apply to the test
|
|
// case. Skip MergePut because single delete does not get removed when it
|
|
// encounters a merge.
|
|
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
|
|
kSkipMergePut));
|
|
}
|
|
|
|
TEST_F(DBBasicTest, EmptyFlush) {
|
|
// It is possible to produce empty flushes when using single deletes. Tests
|
|
// whether empty flushes cause issues.
|
|
do {
|
|
Random rnd(301);
|
|
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
Put(1, "a", Slice());
|
|
SingleDelete(1, "a");
|
|
ASSERT_OK(Flush(1));
|
|
|
|
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
|
|
// Skip FIFO and universal compaction as they do not apply to the test
|
|
// case. Skip MergePut because merges cannot be combined with single
|
|
// deletions.
|
|
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |
|
|
kSkipMergePut));
|
|
}
|
|
|
|
TEST_F(DBBasicTest, GetFromVersions) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
ASSERT_OK(Put(1, "foo", "v1"));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("NOT_FOUND", Get(0, "foo"));
|
|
} while (ChangeOptions());
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBBasicTest, GetSnapshot) {
|
|
anon::OptionsOverride options_override;
|
|
options_override.skip_policy = kSkipNoSnapshot;
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
|
|
// Try with both a short key and a long key
|
|
for (int i = 0; i < 2; i++) {
|
|
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
|
|
ASSERT_OK(Put(1, key, "v1"));
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
ASSERT_OK(Put(1, key, "v2"));
|
|
ASSERT_EQ("v2", Get(1, key));
|
|
ASSERT_EQ("v1", Get(1, key, s1));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ("v2", Get(1, key));
|
|
ASSERT_EQ("v1", Get(1, key, s1));
|
|
db_->ReleaseSnapshot(s1);
|
|
}
|
|
} while (ChangeOptions());
|
|
}
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_F(DBBasicTest, CheckLock) {
|
|
do {
|
|
DB* localdb;
|
|
Options options = CurrentOptions();
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
// second open should fail
|
|
Status s = DB::Open(options, dbname_, &localdb);
|
|
ASSERT_NOK(s);
|
|
#ifdef OS_LINUX
|
|
ASSERT_TRUE(s.ToString().find("lock hold by current process") !=
|
|
std::string::npos);
|
|
#endif // OS_LINUX
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, FlushMultipleMemtable) {
|
|
do {
|
|
Options options = CurrentOptions();
|
|
WriteOptions writeOpt = WriteOptions();
|
|
writeOpt.disableWAL = true;
|
|
options.max_write_buffer_number = 4;
|
|
options.min_write_buffer_number_to_merge = 3;
|
|
options.max_write_buffer_size_to_maintain = -1;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
|
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "bar"));
|
|
ASSERT_OK(Flush(1));
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, FlushEmptyColumnFamily) {
|
|
// Block flush thread and disable compaction thread
|
|
env_->SetBackgroundThreads(1, Env::HIGH);
|
|
env_->SetBackgroundThreads(1, Env::LOW);
|
|
test::SleepingBackgroundTask sleeping_task_low;
|
|
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
|
|
Env::Priority::LOW);
|
|
test::SleepingBackgroundTask sleeping_task_high;
|
|
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
|
|
&sleeping_task_high, Env::Priority::HIGH);
|
|
|
|
Options options = CurrentOptions();
|
|
// disable compaction
|
|
options.disable_auto_compactions = true;
|
|
WriteOptions writeOpt = WriteOptions();
|
|
writeOpt.disableWAL = true;
|
|
options.max_write_buffer_number = 2;
|
|
options.min_write_buffer_number_to_merge = 1;
|
|
options.max_write_buffer_size_to_maintain =
|
|
static_cast<int64_t>(options.write_buffer_size);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
// Compaction can still go through even if no thread can flush the
|
|
// mem table.
|
|
ASSERT_OK(Flush(0));
|
|
ASSERT_OK(Flush(1));
|
|
|
|
// Insert can go through
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[0], "foo", "v1"));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
|
|
|
ASSERT_EQ("v1", Get(0, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "bar"));
|
|
|
|
sleeping_task_high.WakeUp();
|
|
sleeping_task_high.WaitUntilDone();
|
|
|
|
// Flush can still go through.
|
|
ASSERT_OK(Flush(0));
|
|
ASSERT_OK(Flush(1));
|
|
|
|
sleeping_task_low.WakeUp();
|
|
sleeping_task_low.WaitUntilDone();
|
|
}
|
|
|
|
TEST_F(DBBasicTest, Flush) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
WriteOptions writeOpt = WriteOptions();
|
|
writeOpt.disableWAL = true;
|
|
SetPerfLevel(kEnableTime);
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v1"));
|
|
// this will now also flush the last 2 writes
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v1"));
|
|
|
|
get_perf_context()->Reset();
|
|
Get(1, "foo");
|
|
ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
|
|
ASSERT_EQ(2, (int)get_perf_context()->get_read_bytes);
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v1", Get(1, "foo"));
|
|
ASSERT_EQ("v1", Get(1, "bar"));
|
|
|
|
writeOpt.disableWAL = true;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v2"));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v2"));
|
|
ASSERT_OK(Flush(1));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
ASSERT_EQ("v2", Get(1, "bar"));
|
|
get_perf_context()->Reset();
|
|
ASSERT_EQ("v2", Get(1, "foo"));
|
|
ASSERT_TRUE((int)get_perf_context()->get_from_output_files_time > 0);
|
|
|
|
writeOpt.disableWAL = false;
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "bar", "v3"));
|
|
ASSERT_OK(dbfull()->Put(writeOpt, handles_[1], "foo", "v3"));
|
|
ASSERT_OK(Flush(1));
|
|
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, CurrentOptions());
|
|
// 'foo' should be there because its put
|
|
// has WAL enabled.
|
|
ASSERT_EQ("v3", Get(1, "foo"));
|
|
ASSERT_EQ("v3", Get(1, "bar"));
|
|
|
|
SetPerfLevel(kDisable);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, ManifestRollOver) {
|
|
do {
|
|
Options options;
|
|
options.max_manifest_file_size = 10; // 10 bytes
|
|
options = CurrentOptions(options);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
{
|
|
ASSERT_OK(Put(1, "manifest_key1", std::string(1000, '1')));
|
|
ASSERT_OK(Put(1, "manifest_key2", std::string(1000, '2')));
|
|
ASSERT_OK(Put(1, "manifest_key3", std::string(1000, '3')));
|
|
uint64_t manifest_before_flush = dbfull()->TEST_Current_Manifest_FileNo();
|
|
ASSERT_OK(Flush(1)); // This should trigger LogAndApply.
|
|
uint64_t manifest_after_flush = dbfull()->TEST_Current_Manifest_FileNo();
|
|
ASSERT_GT(manifest_after_flush, manifest_before_flush);
|
|
ReopenWithColumnFamilies({"default", "pikachu"}, options);
|
|
ASSERT_GT(dbfull()->TEST_Current_Manifest_FileNo(), manifest_after_flush);
|
|
// check if a new manifest file got inserted or not.
|
|
ASSERT_EQ(std::string(1000, '1'), Get(1, "manifest_key1"));
|
|
ASSERT_EQ(std::string(1000, '2'), Get(1, "manifest_key2"));
|
|
ASSERT_EQ(std::string(1000, '3'), Get(1, "manifest_key3"));
|
|
}
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, IdentityAcrossRestarts1) {
|
|
do {
|
|
std::string id1;
|
|
ASSERT_OK(db_->GetDbIdentity(id1));
|
|
|
|
Options options = CurrentOptions();
|
|
Reopen(options);
|
|
std::string id2;
|
|
ASSERT_OK(db_->GetDbIdentity(id2));
|
|
// id1 should match id2 because identity was not regenerated
|
|
ASSERT_EQ(id1.compare(id2), 0);
|
|
|
|
std::string idfilename = IdentityFileName(dbname_);
|
|
ASSERT_OK(env_->DeleteFile(idfilename));
|
|
Reopen(options);
|
|
std::string id3;
|
|
ASSERT_OK(db_->GetDbIdentity(id3));
|
|
if (options.write_dbid_to_manifest) {
|
|
ASSERT_EQ(id1.compare(id3), 0);
|
|
} else {
|
|
// id1 should NOT match id3 because identity was regenerated
|
|
ASSERT_NE(id1.compare(id3), 0);
|
|
}
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
|
|
do {
|
|
std::string id1;
|
|
ASSERT_OK(db_->GetDbIdentity(id1));
|
|
|
|
Options options = CurrentOptions();
|
|
options.write_dbid_to_manifest = true;
|
|
Reopen(options);
|
|
std::string id2;
|
|
ASSERT_OK(db_->GetDbIdentity(id2));
|
|
// id1 should match id2 because identity was not regenerated
|
|
ASSERT_EQ(id1.compare(id2), 0);
|
|
|
|
std::string idfilename = IdentityFileName(dbname_);
|
|
ASSERT_OK(env_->DeleteFile(idfilename));
|
|
Reopen(options);
|
|
std::string id3;
|
|
ASSERT_OK(db_->GetDbIdentity(id3));
|
|
// id1 should NOT match id3 because identity was regenerated
|
|
ASSERT_EQ(id1, id3);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
TEST_F(DBBasicTest, Snapshot) {
|
|
anon::OptionsOverride options_override;
|
|
options_override.skip_policy = kSkipNoSnapshot;
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
|
|
Put(0, "foo", "0v1");
|
|
Put(1, "foo", "1v1");
|
|
|
|
const Snapshot* s1 = db_->GetSnapshot();
|
|
ASSERT_EQ(1U, GetNumSnapshots());
|
|
uint64_t time_snap1 = GetTimeOldestSnapshots();
|
|
ASSERT_GT(time_snap1, 0U);
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
|
|
Put(0, "foo", "0v2");
|
|
Put(1, "foo", "1v2");
|
|
|
|
env_->addon_time_.fetch_add(1);
|
|
|
|
const Snapshot* s2 = db_->GetSnapshot();
|
|
ASSERT_EQ(2U, GetNumSnapshots());
|
|
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
|
|
Put(0, "foo", "0v3");
|
|
Put(1, "foo", "1v3");
|
|
|
|
{
|
|
ManagedSnapshot s3(db_);
|
|
ASSERT_EQ(3U, GetNumSnapshots());
|
|
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
|
|
|
|
Put(0, "foo", "0v4");
|
|
Put(1, "foo", "1v4");
|
|
ASSERT_EQ("0v1", Get(0, "foo", s1));
|
|
ASSERT_EQ("1v1", Get(1, "foo", s1));
|
|
ASSERT_EQ("0v2", Get(0, "foo", s2));
|
|
ASSERT_EQ("1v2", Get(1, "foo", s2));
|
|
ASSERT_EQ("0v3", Get(0, "foo", s3.snapshot()));
|
|
ASSERT_EQ("1v3", Get(1, "foo", s3.snapshot()));
|
|
ASSERT_EQ("0v4", Get(0, "foo"));
|
|
ASSERT_EQ("1v4", Get(1, "foo"));
|
|
}
|
|
|
|
ASSERT_EQ(2U, GetNumSnapshots());
|
|
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
|
|
ASSERT_EQ("0v1", Get(0, "foo", s1));
|
|
ASSERT_EQ("1v1", Get(1, "foo", s1));
|
|
ASSERT_EQ("0v2", Get(0, "foo", s2));
|
|
ASSERT_EQ("1v2", Get(1, "foo", s2));
|
|
ASSERT_EQ("0v4", Get(0, "foo"));
|
|
ASSERT_EQ("1v4", Get(1, "foo"));
|
|
|
|
db_->ReleaseSnapshot(s1);
|
|
ASSERT_EQ("0v2", Get(0, "foo", s2));
|
|
ASSERT_EQ("1v2", Get(1, "foo", s2));
|
|
ASSERT_EQ("0v4", Get(0, "foo"));
|
|
ASSERT_EQ("1v4", Get(1, "foo"));
|
|
ASSERT_EQ(1U, GetNumSnapshots());
|
|
ASSERT_LT(time_snap1, GetTimeOldestSnapshots());
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), s2->GetSequenceNumber());
|
|
|
|
db_->ReleaseSnapshot(s2);
|
|
ASSERT_EQ(0U, GetNumSnapshots());
|
|
ASSERT_EQ(GetSequenceOldestSnapshots(), 0);
|
|
ASSERT_EQ("0v4", Get(0, "foo"));
|
|
ASSERT_EQ("1v4", Get(1, "foo"));
|
|
} while (ChangeOptions());
|
|
}
|
|
|
|
#endif // ROCKSDB_LITE
|
|
|
|
TEST_F(DBBasicTest, CompactBetweenSnapshots) {
|
|
anon::OptionsOverride options_override;
|
|
options_override.skip_policy = kSkipNoSnapshot;
|
|
do {
|
|
Options options = CurrentOptions(options_override);
|
|
options.disable_auto_compactions = true;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
Random rnd(301);
|
|
FillLevels("a", "z", 1);
|
|
|
|
Put(1, "foo", "first");
|
|
const Snapshot* snapshot1 = db_->GetSnapshot();
|
|
Put(1, "foo", "second");
|
|
Put(1, "foo", "third");
|
|
Put(1, "foo", "fourth");
|
|
const Snapshot* snapshot2 = db_->GetSnapshot();
|
|
Put(1, "foo", "fifth");
|
|
Put(1, "foo", "sixth");
|
|
|
|
// All entries (including duplicates) exist
|
|
// before any compaction or flush is triggered.
|
|
ASSERT_EQ(AllEntriesFor("foo", 1),
|
|
"[ sixth, fifth, fourth, third, second, first ]");
|
|
ASSERT_EQ("sixth", Get(1, "foo"));
|
|
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
|
|
ASSERT_EQ("first", Get(1, "foo", snapshot1));
|
|
|
|
// After a flush, "second", "third" and "fifth" should
|
|
// be removed
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]");
|
|
|
|
// after we release the snapshot1, only two values left
|
|
db_->ReleaseSnapshot(snapshot1);
|
|
FillLevels("a", "z", 1);
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
|
|
// We have only one valid snapshot snapshot2. Since snapshot1 is
|
|
// not valid anymore, "first" should be removed by a compaction.
|
|
ASSERT_EQ("sixth", Get(1, "foo"));
|
|
ASSERT_EQ("fourth", Get(1, "foo", snapshot2));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth ]");
|
|
|
|
// after we release the snapshot2, only one value should be left
|
|
db_->ReleaseSnapshot(snapshot2);
|
|
FillLevels("a", "z", 1);
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ("sixth", Get(1, "foo"));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
|
|
} while (ChangeOptions(kSkipFIFOCompaction));
|
|
}
|
|
|
|
TEST_F(DBBasicTest, DBOpen_Options) {
|
|
Options options = CurrentOptions();
|
|
Close();
|
|
Destroy(options);
|
|
|
|
// Does not exist, and create_if_missing == false: error
|
|
DB* db = nullptr;
|
|
options.create_if_missing = false;
|
|
Status s = DB::Open(options, dbname_, &db);
|
|
ASSERT_TRUE(strstr(s.ToString().c_str(), "does not exist") != nullptr);
|
|
ASSERT_TRUE(db == nullptr);
|
|
|
|
// Does not exist, and create_if_missing == true: OK
|
|
options.create_if_missing = true;
|
|
s = DB::Open(options, dbname_, &db);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(db != nullptr);
|
|
|
|
delete db;
|
|
db = nullptr;
|
|
|
|
// Does exist, and error_if_exists == true: error
|
|
options.create_if_missing = false;
|
|
options.error_if_exists = true;
|
|
s = DB::Open(options, dbname_, &db);
|
|
ASSERT_TRUE(strstr(s.ToString().c_str(), "exists") != nullptr);
|
|
ASSERT_TRUE(db == nullptr);
|
|
|
|
// Does exist, and error_if_exists == false: OK
|
|
options.create_if_missing = true;
|
|
options.error_if_exists = false;
|
|
s = DB::Open(options, dbname_, &db);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(db != nullptr);
|
|
|
|
delete db;
|
|
db = nullptr;
|
|
}
|
|
|
|
TEST_F(DBBasicTest, CompactOnFlush) {
|
|
anon::OptionsOverride options_override;
|
|
options_override.skip_policy = kSkipNoSnapshot;
|
|
do {
|
|
Options options = CurrentOptions(options_override);
|
|
options.disable_auto_compactions = true;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
Put(1, "foo", "v1");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
|
|
|
|
// Write two new keys
|
|
Put(1, "a", "begin");
|
|
Put(1, "z", "end");
|
|
Flush(1);
|
|
|
|
// Case1: Delete followed by a put
|
|
Delete(1, "foo");
|
|
Put(1, "foo", "v2");
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
|
|
|
|
// After the current memtable is flushed, the DEL should
|
|
// have been removed
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
|
|
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
|
|
|
|
// Case 2: Delete followed by another delete
|
|
Delete(1, "foo");
|
|
Delete(1, "foo");
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
|
|
|
|
// Case 3: Put followed by a delete
|
|
Put(1, "foo", "v3");
|
|
Delete(1, "foo");
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
|
|
|
|
// Case 4: Put followed by another Put
|
|
Put(1, "foo", "v4");
|
|
Put(1, "foo", "v5");
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
|
|
|
|
// clear database
|
|
Delete(1, "foo");
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
|
|
|
|
// Case 5: Put followed by snapshot followed by another Put
|
|
// Both puts should remain.
|
|
Put(1, "foo", "v6");
|
|
const Snapshot* snapshot = db_->GetSnapshot();
|
|
Put(1, "foo", "v7");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
|
|
db_->ReleaseSnapshot(snapshot);
|
|
|
|
// clear database
|
|
Delete(1, "foo");
|
|
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
|
|
nullptr);
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
|
|
|
|
// Case 5: snapshot followed by a put followed by another Put
|
|
// Only the last put should remain.
|
|
const Snapshot* snapshot1 = db_->GetSnapshot();
|
|
Put(1, "foo", "v8");
|
|
Put(1, "foo", "v9");
|
|
ASSERT_OK(Flush(1));
|
|
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
|
|
db_->ReleaseSnapshot(snapshot1);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, FlushOneColumnFamily) {
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
|
"alyosha", "popovich"},
|
|
options);
|
|
|
|
ASSERT_OK(Put(0, "Default", "Default"));
|
|
ASSERT_OK(Put(1, "pikachu", "pikachu"));
|
|
ASSERT_OK(Put(2, "ilya", "ilya"));
|
|
ASSERT_OK(Put(3, "muromec", "muromec"));
|
|
ASSERT_OK(Put(4, "dobrynia", "dobrynia"));
|
|
ASSERT_OK(Put(5, "nikitich", "nikitich"));
|
|
ASSERT_OK(Put(6, "alyosha", "alyosha"));
|
|
ASSERT_OK(Put(7, "popovich", "popovich"));
|
|
|
|
for (int i = 0; i < 8; ++i) {
|
|
Flush(i);
|
|
auto tables = ListTableFiles(env_, dbname_);
|
|
ASSERT_EQ(tables.size(), i + 1U);
|
|
}
|
|
}
|
|
|
|
TEST_F(DBBasicTest, MultiGetSimple) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
SetPerfLevel(kEnableCount);
|
|
ASSERT_OK(Put(1, "k1", "v1"));
|
|
ASSERT_OK(Put(1, "k2", "v2"));
|
|
ASSERT_OK(Put(1, "k3", "v3"));
|
|
ASSERT_OK(Put(1, "k4", "v4"));
|
|
ASSERT_OK(Delete(1, "k4"));
|
|
ASSERT_OK(Put(1, "k5", "v5"));
|
|
ASSERT_OK(Delete(1, "no_key"));
|
|
|
|
std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
|
|
|
|
std::vector<std::string> values(20, "Temporary data to be overwritten");
|
|
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
|
|
|
|
get_perf_context()->Reset();
|
|
std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
|
|
ASSERT_EQ(values.size(), keys.size());
|
|
ASSERT_EQ(values[0], "v1");
|
|
ASSERT_EQ(values[1], "v2");
|
|
ASSERT_EQ(values[2], "v3");
|
|
ASSERT_EQ(values[4], "v5");
|
|
// four kv pairs * two bytes per value
|
|
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
|
|
|
|
ASSERT_OK(s[0]);
|
|
ASSERT_OK(s[1]);
|
|
ASSERT_OK(s[2]);
|
|
ASSERT_TRUE(s[3].IsNotFound());
|
|
ASSERT_OK(s[4]);
|
|
ASSERT_TRUE(s[5].IsNotFound());
|
|
SetPerfLevel(kDisable);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, MultiGetEmpty) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
// Empty Key Set
|
|
std::vector<Slice> keys;
|
|
std::vector<std::string> values;
|
|
std::vector<ColumnFamilyHandle*> cfs;
|
|
std::vector<Status> s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
|
|
ASSERT_EQ(s.size(), 0U);
|
|
|
|
// Empty Database, Empty Key Set
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
DestroyAndReopen(options);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
|
|
ASSERT_EQ(s.size(), 0U);
|
|
|
|
// Empty Database, Search for Keys
|
|
keys.resize(2);
|
|
keys[0] = "a";
|
|
keys[1] = "b";
|
|
cfs.push_back(handles_[0]);
|
|
cfs.push_back(handles_[1]);
|
|
s = db_->MultiGet(ReadOptions(), cfs, keys, &values);
|
|
ASSERT_EQ(static_cast<int>(s.size()), 2);
|
|
ASSERT_TRUE(s[0].IsNotFound() && s[1].IsNotFound());
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, ChecksumTest) {
|
|
BlockBasedTableOptions table_options;
|
|
Options options = CurrentOptions();
|
|
// change when new checksum type added
|
|
int max_checksum = static_cast<int>(kxxHash64);
|
|
const int kNumPerFile = 2;
|
|
|
|
// generate one table with each type of checksum
|
|
for (int i = 0; i <= max_checksum; ++i) {
|
|
table_options.checksum = static_cast<ChecksumType>(i);
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
Reopen(options);
|
|
for (int j = 0; j < kNumPerFile; ++j) {
|
|
ASSERT_OK(Put(Key(i * kNumPerFile + j), Key(i * kNumPerFile + j)));
|
|
}
|
|
ASSERT_OK(Flush());
|
|
}
|
|
|
|
// with each valid checksum type setting...
|
|
for (int i = 0; i <= max_checksum; ++i) {
|
|
table_options.checksum = static_cast<ChecksumType>(i);
|
|
options.table_factory.reset(NewBlockBasedTableFactory(table_options));
|
|
Reopen(options);
|
|
// verify every type of checksum (should be regardless of that setting)
|
|
for (int j = 0; j < (max_checksum + 1) * kNumPerFile; ++j) {
|
|
ASSERT_EQ(Key(j), Get(Key(j)));
|
|
}
|
|
}
|
|
}
|
|
|
|
// On Windows you can have either memory mapped file or a file
|
|
// with unbuffered access. So this asserts and does not make
|
|
// sense to run
|
|
#ifndef OS_WIN
|
|
TEST_F(DBBasicTest, MmapAndBufferOptions) {
|
|
if (!IsMemoryMappedAccessSupported()) {
|
|
return;
|
|
}
|
|
Options options = CurrentOptions();
|
|
|
|
options.use_direct_reads = true;
|
|
options.allow_mmap_reads = true;
|
|
ASSERT_NOK(TryReopen(options));
|
|
|
|
// All other combinations are acceptable
|
|
options.use_direct_reads = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
|
|
if (IsDirectIOSupported()) {
|
|
options.use_direct_reads = true;
|
|
options.allow_mmap_reads = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
}
|
|
|
|
options.use_direct_reads = false;
|
|
ASSERT_OK(TryReopen(options));
|
|
}
|
|
#endif
|
|
|
|
class TestEnv : public EnvWrapper {
|
|
public:
|
|
explicit TestEnv(Env* base_env) : EnvWrapper(base_env), close_count(0) {}
|
|
|
|
class TestLogger : public Logger {
|
|
public:
|
|
using Logger::Logv;
|
|
explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
|
|
~TestLogger() override {
|
|
if (!closed_) {
|
|
CloseHelper();
|
|
}
|
|
}
|
|
void Logv(const char* /*format*/, va_list /*ap*/) override {}
|
|
|
|
protected:
|
|
Status CloseImpl() override { return CloseHelper(); }
|
|
|
|
private:
|
|
Status CloseHelper() {
|
|
env->CloseCountInc();
|
|
;
|
|
return Status::IOError();
|
|
}
|
|
TestEnv* env;
|
|
};
|
|
|
|
void CloseCountInc() { close_count++; }
|
|
|
|
int GetCloseCount() { return close_count; }
|
|
|
|
Status NewLogger(const std::string& /*fname*/,
|
|
std::shared_ptr<Logger>* result) override {
|
|
result->reset(new TestLogger(this));
|
|
return Status::OK();
|
|
}
|
|
|
|
private:
|
|
int close_count;
|
|
};
|
|
|
|
TEST_F(DBBasicTest, DBClose) {
|
|
Options options = GetDefaultOptions();
|
|
std::string dbname = test::PerThreadDBPath("db_close_test");
|
|
ASSERT_OK(DestroyDB(dbname, options));
|
|
|
|
DB* db = nullptr;
|
|
TestEnv* env = new TestEnv(env_);
|
|
std::unique_ptr<TestEnv> local_env_guard(env);
|
|
options.create_if_missing = true;
|
|
options.env = env;
|
|
Status s = DB::Open(options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(db != nullptr);
|
|
|
|
s = db->Close();
|
|
ASSERT_EQ(env->GetCloseCount(), 1);
|
|
ASSERT_EQ(s, Status::IOError());
|
|
|
|
delete db;
|
|
ASSERT_EQ(env->GetCloseCount(), 1);
|
|
|
|
// Do not call DB::Close() and ensure our logger Close() still gets called
|
|
s = DB::Open(options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(db != nullptr);
|
|
delete db;
|
|
ASSERT_EQ(env->GetCloseCount(), 2);
|
|
|
|
// Provide our own logger and ensure DB::Close() does not close it
|
|
options.info_log.reset(new TestEnv::TestLogger(env));
|
|
options.create_if_missing = false;
|
|
s = DB::Open(options, dbname, &db);
|
|
ASSERT_OK(s);
|
|
ASSERT_TRUE(db != nullptr);
|
|
|
|
s = db->Close();
|
|
ASSERT_EQ(s, Status::OK());
|
|
delete db;
|
|
ASSERT_EQ(env->GetCloseCount(), 2);
|
|
options.info_log.reset();
|
|
ASSERT_EQ(env->GetCloseCount(), 3);
|
|
}
|
|
|
|
TEST_F(DBBasicTest, DBCloseFlushError) {
|
|
std::unique_ptr<FaultInjectionTestEnv> fault_injection_env(
|
|
new FaultInjectionTestEnv(env_));
|
|
Options options = GetDefaultOptions();
|
|
options.create_if_missing = true;
|
|
options.manual_wal_flush = true;
|
|
options.write_buffer_size=100;
|
|
options.env = fault_injection_env.get();
|
|
|
|
Reopen(options);
|
|
ASSERT_OK(Put("key1", "value1"));
|
|
ASSERT_OK(Put("key2", "value2"));
|
|
ASSERT_OK(dbfull()->TEST_SwitchMemtable());
|
|
ASSERT_OK(Put("key3", "value3"));
|
|
fault_injection_env->SetFilesystemActive(false);
|
|
Status s = dbfull()->Close();
|
|
fault_injection_env->SetFilesystemActive(true);
|
|
ASSERT_NE(s, Status::OK());
|
|
|
|
Destroy(options);
|
|
}
|
|
|
|
class DBMultiGetTestWithParam : public DBBasicTest,
|
|
public testing::WithParamInterface<bool> {};
|
|
|
|
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCF) {
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
|
"alyosha", "popovich"},
|
|
options);
|
|
// <CF, key, value> tuples
|
|
std::vector<std::tuple<int, std::string, std::string>> cf_kv_vec;
|
|
static const int num_keys = 24;
|
|
cf_kv_vec.reserve(num_keys);
|
|
|
|
for (int i = 0; i < num_keys; ++i) {
|
|
int cf = i / 3;
|
|
int cf_key = 1 % 3;
|
|
cf_kv_vec.emplace_back(std::make_tuple(
|
|
cf, "cf" + std::to_string(cf) + "_key_" + std::to_string(cf_key),
|
|
"cf" + std::to_string(cf) + "_val_" + std::to_string(cf_key)));
|
|
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
|
|
std::get<2>(cf_kv_vec[i])));
|
|
}
|
|
|
|
int get_sv_count = 0;
|
|
ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
|
if (++get_sv_count == 2) {
|
|
// After MultiGet refs a couple of CFs, flush all CFs so MultiGet
|
|
// is forced to repeat the process
|
|
for (int i = 0; i < num_keys; ++i) {
|
|
int cf = i / 3;
|
|
int cf_key = i % 8;
|
|
if (cf_key == 0) {
|
|
ASSERT_OK(Flush(cf));
|
|
}
|
|
ASSERT_OK(Put(std::get<0>(cf_kv_vec[i]), std::get<1>(cf_kv_vec[i]),
|
|
std::get<2>(cf_kv_vec[i]) + "_2"));
|
|
}
|
|
}
|
|
if (get_sv_count == 11) {
|
|
for (int i = 0; i < 8; ++i) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
db->GetColumnFamilyHandle(i))
|
|
->cfd();
|
|
ASSERT_EQ(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
std::vector<int> cfs;
|
|
std::vector<std::string> keys;
|
|
std::vector<std::string> values;
|
|
|
|
for (int i = 0; i < num_keys; ++i) {
|
|
cfs.push_back(std::get<0>(cf_kv_vec[i]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[i]));
|
|
}
|
|
|
|
values = MultiGet(cfs, keys, nullptr, GetParam());
|
|
ASSERT_EQ(values.size(), num_keys);
|
|
for (unsigned int j = 0; j < values.size(); ++j) {
|
|
ASSERT_EQ(values[j], std::get<2>(cf_kv_vec[j]) + "_2");
|
|
}
|
|
|
|
keys.clear();
|
|
cfs.clear();
|
|
cfs.push_back(std::get<0>(cf_kv_vec[0]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[0]));
|
|
cfs.push_back(std::get<0>(cf_kv_vec[3]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[3]));
|
|
cfs.push_back(std::get<0>(cf_kv_vec[4]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[4]));
|
|
values = MultiGet(cfs, keys, nullptr, GetParam());
|
|
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[0]) + "_2");
|
|
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[3]) + "_2");
|
|
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[4]) + "_2");
|
|
|
|
keys.clear();
|
|
cfs.clear();
|
|
cfs.push_back(std::get<0>(cf_kv_vec[7]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[7]));
|
|
cfs.push_back(std::get<0>(cf_kv_vec[6]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[6]));
|
|
cfs.push_back(std::get<0>(cf_kv_vec[1]));
|
|
keys.push_back(std::get<1>(cf_kv_vec[1]));
|
|
values = MultiGet(cfs, keys, nullptr, GetParam());
|
|
ASSERT_EQ(values[0], std::get<2>(cf_kv_vec[7]) + "_2");
|
|
ASSERT_EQ(values[1], std::get<2>(cf_kv_vec[6]) + "_2");
|
|
ASSERT_EQ(values[2], std::get<2>(cf_kv_vec[1]) + "_2");
|
|
|
|
for (int cf = 0; cf < 8; ++cf) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(cf))
|
|
->cfd();
|
|
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
|
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVObsolete);
|
|
}
|
|
}
|
|
|
|
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFMutex) {
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
|
"alyosha", "popovich"},
|
|
options);
|
|
|
|
for (int i = 0; i < 8; ++i) {
|
|
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
|
"cf" + std::to_string(i) + "_val"));
|
|
}
|
|
|
|
int get_sv_count = 0;
|
|
int retries = 0;
|
|
bool last_try = false;
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::MultiGet::LastTry", [&](void* /*arg*/) {
|
|
last_try = true;
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
|
if (last_try) {
|
|
return;
|
|
}
|
|
if (++get_sv_count == 2) {
|
|
++retries;
|
|
get_sv_count = 0;
|
|
for (int i = 0; i < 8; ++i) {
|
|
ASSERT_OK(Flush(i));
|
|
ASSERT_OK(Put(
|
|
i, "cf" + std::to_string(i) + "_key",
|
|
"cf" + std::to_string(i) + "_val" + std::to_string(retries)));
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
std::vector<int> cfs;
|
|
std::vector<std::string> keys;
|
|
std::vector<std::string> values;
|
|
|
|
for (int i = 0; i < 8; ++i) {
|
|
cfs.push_back(i);
|
|
keys.push_back("cf" + std::to_string(i) + "_key");
|
|
}
|
|
|
|
values = MultiGet(cfs, keys, nullptr, GetParam());
|
|
ASSERT_TRUE(last_try);
|
|
ASSERT_EQ(values.size(), 8);
|
|
for (unsigned int j = 0; j < values.size(); ++j) {
|
|
ASSERT_EQ(values[j],
|
|
"cf" + std::to_string(j) + "_val" + std::to_string(retries));
|
|
}
|
|
for (int i = 0; i < 8; ++i) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
|
|
->cfd();
|
|
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
|
}
|
|
}
|
|
|
|
TEST_P(DBMultiGetTestWithParam, MultiGetMultiCFSnapshot) {
|
|
Options options = CurrentOptions();
|
|
CreateAndReopenWithCF({"pikachu", "ilya", "muromec", "dobrynia", "nikitich",
|
|
"alyosha", "popovich"},
|
|
options);
|
|
|
|
for (int i = 0; i < 8; ++i) {
|
|
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
|
"cf" + std::to_string(i) + "_val"));
|
|
}
|
|
|
|
int get_sv_count = 0;
|
|
ROCKSDB_NAMESPACE::DBImpl* db = reinterpret_cast<DBImpl*>(db_);
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
|
|
"DBImpl::MultiGet::AfterRefSV", [&](void* /*arg*/) {
|
|
if (++get_sv_count == 2) {
|
|
for (int i = 0; i < 8; ++i) {
|
|
ASSERT_OK(Flush(i));
|
|
ASSERT_OK(Put(i, "cf" + std::to_string(i) + "_key",
|
|
"cf" + std::to_string(i) + "_val2"));
|
|
}
|
|
}
|
|
if (get_sv_count == 8) {
|
|
for (int i = 0; i < 8; ++i) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
db->GetColumnFamilyHandle(i))
|
|
->cfd();
|
|
ASSERT_TRUE(
|
|
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVInUse) ||
|
|
(cfd->TEST_GetLocalSV()->Get() == SuperVersion::kSVObsolete));
|
|
}
|
|
}
|
|
});
|
|
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
std::vector<int> cfs;
|
|
std::vector<std::string> keys;
|
|
std::vector<std::string> values;
|
|
|
|
for (int i = 0; i < 8; ++i) {
|
|
cfs.push_back(i);
|
|
keys.push_back("cf" + std::to_string(i) + "_key");
|
|
}
|
|
|
|
const Snapshot* snapshot = db_->GetSnapshot();
|
|
values = MultiGet(cfs, keys, snapshot, GetParam());
|
|
db_->ReleaseSnapshot(snapshot);
|
|
ASSERT_EQ(values.size(), 8);
|
|
for (unsigned int j = 0; j < values.size(); ++j) {
|
|
ASSERT_EQ(values[j], "cf" + std::to_string(j) + "_val");
|
|
}
|
|
for (int i = 0; i < 8; ++i) {
|
|
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(
|
|
reinterpret_cast<DBImpl*>(db_)->GetColumnFamilyHandle(i))
|
|
->cfd();
|
|
ASSERT_NE(cfd->TEST_GetLocalSV()->Get(), SuperVersion::kSVInUse);
|
|
}
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBMultiGetTestWithParam, DBMultiGetTestWithParam,
|
|
testing::Bool());
|
|
|
|
TEST_F(DBBasicTest, MultiGetBatchedSimpleUnsorted) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
SetPerfLevel(kEnableCount);
|
|
ASSERT_OK(Put(1, "k1", "v1"));
|
|
ASSERT_OK(Put(1, "k2", "v2"));
|
|
ASSERT_OK(Put(1, "k3", "v3"));
|
|
ASSERT_OK(Put(1, "k4", "v4"));
|
|
ASSERT_OK(Delete(1, "k4"));
|
|
ASSERT_OK(Put(1, "k5", "v5"));
|
|
ASSERT_OK(Delete(1, "no_key"));
|
|
|
|
get_perf_context()->Reset();
|
|
|
|
std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k2", "k1"});
|
|
std::vector<PinnableSlice> values(keys.size());
|
|
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
|
|
std::vector<Status> s(keys.size());
|
|
|
|
db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
|
|
values.data(), s.data(), false);
|
|
|
|
ASSERT_EQ(values.size(), keys.size());
|
|
ASSERT_EQ(std::string(values[5].data(), values[5].size()), "v1");
|
|
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v2");
|
|
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
|
|
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
|
|
// four kv pairs * two bytes per value
|
|
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
|
|
|
|
ASSERT_TRUE(s[0].IsNotFound());
|
|
ASSERT_OK(s[1]);
|
|
ASSERT_TRUE(s[2].IsNotFound());
|
|
ASSERT_OK(s[3]);
|
|
ASSERT_OK(s[4]);
|
|
ASSERT_OK(s[5]);
|
|
|
|
SetPerfLevel(kDisable);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, MultiGetBatchedSimpleSorted) {
|
|
do {
|
|
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
|
|
SetPerfLevel(kEnableCount);
|
|
ASSERT_OK(Put(1, "k1", "v1"));
|
|
ASSERT_OK(Put(1, "k2", "v2"));
|
|
ASSERT_OK(Put(1, "k3", "v3"));
|
|
ASSERT_OK(Put(1, "k4", "v4"));
|
|
ASSERT_OK(Delete(1, "k4"));
|
|
ASSERT_OK(Put(1, "k5", "v5"));
|
|
ASSERT_OK(Delete(1, "no_key"));
|
|
|
|
get_perf_context()->Reset();
|
|
|
|
std::vector<Slice> keys({"k1", "k2", "k3", "k4", "k5", "no_key"});
|
|
std::vector<PinnableSlice> values(keys.size());
|
|
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
|
|
std::vector<Status> s(keys.size());
|
|
|
|
db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
|
|
values.data(), s.data(), true);
|
|
|
|
ASSERT_EQ(values.size(), keys.size());
|
|
ASSERT_EQ(std::string(values[0].data(), values[0].size()), "v1");
|
|
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v2");
|
|
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
|
|
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v5");
|
|
// four kv pairs * two bytes per value
|
|
ASSERT_EQ(8, (int)get_perf_context()->multiget_read_bytes);
|
|
|
|
ASSERT_OK(s[0]);
|
|
ASSERT_OK(s[1]);
|
|
ASSERT_OK(s[2]);
|
|
ASSERT_TRUE(s[3].IsNotFound());
|
|
ASSERT_OK(s[4]);
|
|
ASSERT_TRUE(s[5].IsNotFound());
|
|
|
|
SetPerfLevel(kDisable);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
Reopen(options);
|
|
int num_keys = 0;
|
|
|
|
for (int i = 0; i < 128; ++i) {
|
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
MoveFilesToLevel(2);
|
|
|
|
for (int i = 0; i < 128; i += 3) {
|
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
MoveFilesToLevel(1);
|
|
|
|
for (int i = 0; i < 128; i += 5) {
|
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
ASSERT_EQ(0, num_keys);
|
|
|
|
for (int i = 0; i < 128; i += 9) {
|
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
|
|
}
|
|
|
|
std::vector<std::string> keys;
|
|
std::vector<std::string> values;
|
|
|
|
for (int i = 64; i < 80; ++i) {
|
|
keys.push_back("key_" + std::to_string(i));
|
|
}
|
|
|
|
values = MultiGet(keys, nullptr);
|
|
ASSERT_EQ(values.size(), 16);
|
|
for (unsigned int j = 0; j < values.size(); ++j) {
|
|
int key = j + 64;
|
|
if (key % 9 == 0) {
|
|
ASSERT_EQ(values[j], "val_mem_" + std::to_string(key));
|
|
} else if (key % 5 == 0) {
|
|
ASSERT_EQ(values[j], "val_l0_" + std::to_string(key));
|
|
} else if (key % 3 == 0) {
|
|
ASSERT_EQ(values[j], "val_l1_" + std::to_string(key));
|
|
} else {
|
|
ASSERT_EQ(values[j], "val_l2_" + std::to_string(key));
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_F(DBBasicTest, MultiGetBatchedMultiLevelMerge) {
|
|
Options options = CurrentOptions();
|
|
options.disable_auto_compactions = true;
|
|
options.merge_operator = MergeOperators::CreateStringAppendOperator();
|
|
BlockBasedTableOptions bbto;
|
|
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
Reopen(options);
|
|
int num_keys = 0;
|
|
|
|
for (int i = 0; i < 128; ++i) {
|
|
ASSERT_OK(Put("key_" + std::to_string(i), "val_l2_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
MoveFilesToLevel(2);
|
|
|
|
for (int i = 0; i < 128; i += 3) {
|
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l1_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
MoveFilesToLevel(1);
|
|
|
|
for (int i = 0; i < 128; i += 5) {
|
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_l0_" + std::to_string(i)));
|
|
num_keys++;
|
|
if (num_keys == 8) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
}
|
|
if (num_keys > 0) {
|
|
Flush();
|
|
num_keys = 0;
|
|
}
|
|
ASSERT_EQ(0, num_keys);
|
|
|
|
for (int i = 0; i < 128; i += 9) {
|
|
ASSERT_OK(Merge("key_" + std::to_string(i), "val_mem_" + std::to_string(i)));
|
|
}
|
|
|
|
std::vector<std::string> keys;
|
|
std::vector<std::string> values;
|
|
|
|
for (int i = 32; i < 80; ++i) {
|
|
keys.push_back("key_" + std::to_string(i));
|
|
}
|
|
|
|
values = MultiGet(keys, nullptr);
|
|
ASSERT_EQ(values.size(), keys.size());
|
|
for (unsigned int j = 0; j < 48; ++j) {
|
|
int key = j + 32;
|
|
std::string value;
|
|
value.append("val_l2_" + std::to_string(key));
|
|
if (key % 3 == 0) {
|
|
value.append(",");
|
|
value.append("val_l1_" + std::to_string(key));
|
|
}
|
|
if (key % 5 == 0) {
|
|
value.append(",");
|
|
value.append("val_l0_" + std::to_string(key));
|
|
}
|
|
if (key % 9 == 0) {
|
|
value.append(",");
|
|
value.append("val_mem_" + std::to_string(key));
|
|
}
|
|
ASSERT_EQ(values[j], value);
|
|
}
|
|
}
|
|
|
|
// Test class for batched MultiGet with prefix extractor
|
|
// Param bool - If true, use partitioned filters
|
|
// If false, use full filter block
|
|
class MultiGetPrefixExtractorTest : public DBBasicTest,
|
|
public ::testing::WithParamInterface<bool> {
|
|
};
|
|
|
|
TEST_P(MultiGetPrefixExtractorTest, Batched) {
|
|
Options options = CurrentOptions();
|
|
options.prefix_extractor.reset(NewFixedPrefixTransform(2));
|
|
options.memtable_prefix_bloom_size_ratio = 10;
|
|
BlockBasedTableOptions bbto;
|
|
if (GetParam()) {
|
|
bbto.index_type = BlockBasedTableOptions::IndexType::kTwoLevelIndexSearch;
|
|
bbto.partition_filters = true;
|
|
}
|
|
bbto.filter_policy.reset(NewBloomFilterPolicy(10, false));
|
|
bbto.whole_key_filtering = false;
|
|
bbto.cache_index_and_filter_blocks = false;
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
Reopen(options);
|
|
|
|
SetPerfLevel(kEnableCount);
|
|
get_perf_context()->Reset();
|
|
|
|
// First key is not in the prefix_extractor domain
|
|
ASSERT_OK(Put("k", "v0"));
|
|
ASSERT_OK(Put("kk1", "v1"));
|
|
ASSERT_OK(Put("kk2", "v2"));
|
|
ASSERT_OK(Put("kk3", "v3"));
|
|
ASSERT_OK(Put("kk4", "v4"));
|
|
std::vector<std::string> mem_keys(
|
|
{"k", "kk1", "kk2", "kk3", "kk4", "rofl", "lmho"});
|
|
std::vector<std::string> inmem_values;
|
|
inmem_values = MultiGet(mem_keys, nullptr);
|
|
ASSERT_EQ(inmem_values[0], "v0");
|
|
ASSERT_EQ(inmem_values[1], "v1");
|
|
ASSERT_EQ(inmem_values[2], "v2");
|
|
ASSERT_EQ(inmem_values[3], "v3");
|
|
ASSERT_EQ(inmem_values[4], "v4");
|
|
ASSERT_EQ(get_perf_context()->bloom_memtable_miss_count, 2);
|
|
ASSERT_EQ(get_perf_context()->bloom_memtable_hit_count, 5);
|
|
ASSERT_OK(Flush());
|
|
|
|
std::vector<std::string> keys({"k", "kk1", "kk2", "kk3", "kk4"});
|
|
std::vector<std::string> values;
|
|
get_perf_context()->Reset();
|
|
values = MultiGet(keys, nullptr);
|
|
ASSERT_EQ(values[0], "v0");
|
|
ASSERT_EQ(values[1], "v1");
|
|
ASSERT_EQ(values[2], "v2");
|
|
ASSERT_EQ(values[3], "v3");
|
|
ASSERT_EQ(values[4], "v4");
|
|
// Filter hits for 4 in-domain keys
|
|
ASSERT_EQ(get_perf_context()->bloom_sst_hit_count, 4);
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(MultiGetPrefix, MultiGetPrefixExtractorTest,
|
|
::testing::Bool());
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
class DBMultiGetRowCacheTest : public DBBasicTest,
|
|
public ::testing::WithParamInterface<bool> {};
|
|
|
|
TEST_P(DBMultiGetRowCacheTest, MultiGetBatched) {
|
|
do {
|
|
option_config_ = kRowCache;
|
|
Options options = CurrentOptions();
|
|
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
SetPerfLevel(kEnableCount);
|
|
ASSERT_OK(Put(1, "k1", "v1"));
|
|
ASSERT_OK(Put(1, "k2", "v2"));
|
|
ASSERT_OK(Put(1, "k3", "v3"));
|
|
ASSERT_OK(Put(1, "k4", "v4"));
|
|
Flush(1);
|
|
ASSERT_OK(Put(1, "k5", "v5"));
|
|
const Snapshot* snap1 = dbfull()->GetSnapshot();
|
|
ASSERT_OK(Delete(1, "k4"));
|
|
Flush(1);
|
|
const Snapshot* snap2 = dbfull()->GetSnapshot();
|
|
|
|
get_perf_context()->Reset();
|
|
|
|
std::vector<Slice> keys({"no_key", "k5", "k4", "k3", "k1"});
|
|
std::vector<PinnableSlice> values(keys.size());
|
|
std::vector<ColumnFamilyHandle*> cfs(keys.size(), handles_[1]);
|
|
std::vector<Status> s(keys.size());
|
|
|
|
ReadOptions ro;
|
|
bool use_snapshots = GetParam();
|
|
if (use_snapshots) {
|
|
ro.snapshot = snap2;
|
|
}
|
|
db_->MultiGet(ro, handles_[1], keys.size(), keys.data(), values.data(),
|
|
s.data(), false);
|
|
|
|
ASSERT_EQ(values.size(), keys.size());
|
|
ASSERT_EQ(std::string(values[4].data(), values[4].size()), "v1");
|
|
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v3");
|
|
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
|
|
// four kv pairs * two bytes per value
|
|
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
|
|
|
|
ASSERT_TRUE(s[0].IsNotFound());
|
|
ASSERT_OK(s[1]);
|
|
ASSERT_TRUE(s[2].IsNotFound());
|
|
ASSERT_OK(s[3]);
|
|
ASSERT_OK(s[4]);
|
|
|
|
// Call MultiGet() again with some intersection with the previous set of
|
|
// keys. Those should already be in the row cache.
|
|
keys.assign({"no_key", "k5", "k3", "k2"});
|
|
for (size_t i = 0; i < keys.size(); ++i) {
|
|
values[i].Reset();
|
|
s[i] = Status::OK();
|
|
}
|
|
get_perf_context()->Reset();
|
|
|
|
if (use_snapshots) {
|
|
ro.snapshot = snap1;
|
|
}
|
|
db_->MultiGet(ReadOptions(), handles_[1], keys.size(), keys.data(),
|
|
values.data(), s.data(), false);
|
|
|
|
ASSERT_EQ(std::string(values[3].data(), values[3].size()), "v2");
|
|
ASSERT_EQ(std::string(values[2].data(), values[2].size()), "v3");
|
|
ASSERT_EQ(std::string(values[1].data(), values[1].size()), "v5");
|
|
// four kv pairs * two bytes per value
|
|
ASSERT_EQ(6, (int)get_perf_context()->multiget_read_bytes);
|
|
|
|
ASSERT_TRUE(s[0].IsNotFound());
|
|
ASSERT_OK(s[1]);
|
|
ASSERT_OK(s[2]);
|
|
ASSERT_OK(s[3]);
|
|
if (use_snapshots) {
|
|
// Only reads from the first SST file would have been cached, since
|
|
// snapshot seq no is > fd.largest_seqno
|
|
ASSERT_EQ(1, TestGetTickerCount(options, ROW_CACHE_HIT));
|
|
} else {
|
|
ASSERT_EQ(2, TestGetTickerCount(options, ROW_CACHE_HIT));
|
|
}
|
|
|
|
SetPerfLevel(kDisable);
|
|
dbfull()->ReleaseSnapshot(snap1);
|
|
dbfull()->ReleaseSnapshot(snap2);
|
|
} while (ChangeCompactOptions());
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(DBMultiGetRowCacheTest, DBMultiGetRowCacheTest,
|
|
testing::Values(true, false));
|
|
|
|
TEST_F(DBBasicTest, GetAllKeyVersions) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
options.disable_auto_compactions = true;
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
ASSERT_EQ(2, handles_.size());
|
|
const size_t kNumInserts = 4;
|
|
const size_t kNumDeletes = 4;
|
|
const size_t kNumUpdates = 4;
|
|
|
|
// Check default column family
|
|
for (size_t i = 0; i != kNumInserts; ++i) {
|
|
ASSERT_OK(Put(std::to_string(i), "value"));
|
|
}
|
|
for (size_t i = 0; i != kNumUpdates; ++i) {
|
|
ASSERT_OK(Put(std::to_string(i), "value1"));
|
|
}
|
|
for (size_t i = 0; i != kNumDeletes; ++i) {
|
|
ASSERT_OK(Delete(std::to_string(i)));
|
|
}
|
|
std::vector<KeyVersion> key_versions;
|
|
ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
|
|
db_, Slice(), Slice(), std::numeric_limits<size_t>::max(),
|
|
&key_versions));
|
|
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
|
|
ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
|
|
db_, handles_[0], Slice(), Slice(), std::numeric_limits<size_t>::max(),
|
|
&key_versions));
|
|
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
|
|
|
|
// Check non-default column family
|
|
for (size_t i = 0; i != kNumInserts - 1; ++i) {
|
|
ASSERT_OK(Put(1, std::to_string(i), "value"));
|
|
}
|
|
for (size_t i = 0; i != kNumUpdates - 1; ++i) {
|
|
ASSERT_OK(Put(1, std::to_string(i), "value1"));
|
|
}
|
|
for (size_t i = 0; i != kNumDeletes - 1; ++i) {
|
|
ASSERT_OK(Delete(1, std::to_string(i)));
|
|
}
|
|
ASSERT_OK(ROCKSDB_NAMESPACE::GetAllKeyVersions(
|
|
db_, handles_[1], Slice(), Slice(), std::numeric_limits<size_t>::max(),
|
|
&key_versions));
|
|
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
TEST_F(DBBasicTest, MultiGetIOBufferOverrun) {
|
|
Options options = CurrentOptions();
|
|
Random rnd(301);
|
|
BlockBasedTableOptions table_options;
|
|
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
|
table_options.block_size = 16 * 1024;
|
|
assert(table_options.block_size >
|
|
BlockBasedTable::kMultiGetReadStackBufSize);
|
|
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
|
Reopen(options);
|
|
|
|
std::string zero_str(128, '\0');
|
|
for (int i = 0; i < 100; ++i) {
|
|
// Make the value compressible. A purely random string doesn't compress
|
|
// and the resultant data block will not be compressed
|
|
std::string value(RandomString(&rnd, 128) + zero_str);
|
|
assert(Put(Key(i), value) == Status::OK());
|
|
}
|
|
Flush();
|
|
|
|
std::vector<std::string> key_data(10);
|
|
std::vector<Slice> keys;
|
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
|
// largest we think we will need
|
|
std::vector<PinnableSlice> values(10);
|
|
std::vector<Status> statuses;
|
|
ReadOptions ro;
|
|
|
|
// Warm up the cache first
|
|
key_data.emplace_back(Key(0));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
key_data.emplace_back(Key(50));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
statuses.resize(keys.size());
|
|
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
}
|
|
|
|
class DBBasicTestWithParallelIO
|
|
: public DBTestBase,
|
|
public testing::WithParamInterface<std::tuple<bool, bool, bool, bool>> {
|
|
public:
|
|
DBBasicTestWithParallelIO() : DBTestBase("/db_basic_test_with_parallel_io") {
|
|
bool compressed_cache = std::get<0>(GetParam());
|
|
bool uncompressed_cache = std::get<1>(GetParam());
|
|
compression_enabled_ = std::get<2>(GetParam());
|
|
fill_cache_ = std::get<3>(GetParam());
|
|
|
|
if (compressed_cache) {
|
|
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
|
|
compressed_cache_ = std::make_shared<MyBlockCache>(cache);
|
|
}
|
|
if (uncompressed_cache) {
|
|
std::shared_ptr<Cache> cache = NewLRUCache(1048576);
|
|
uncompressed_cache_ = std::make_shared<MyBlockCache>(cache);
|
|
}
|
|
|
|
env_->count_random_reads_ = true;
|
|
|
|
Options options = CurrentOptions();
|
|
Random rnd(301);
|
|
BlockBasedTableOptions table_options;
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
if (compression_enabled_) {
|
|
std::vector<CompressionType> compression_types;
|
|
compression_types = GetSupportedCompressions();
|
|
// Not every platform may have compression libraries available, so
|
|
// dynamically pick based on what's available
|
|
if (compression_types.size() == 0) {
|
|
compression_enabled_ = false;
|
|
} else {
|
|
options.compression = compression_types[0];
|
|
}
|
|
}
|
|
#else
|
|
// GetSupportedCompressions() is not available in LITE build
|
|
if (!Snappy_Supported()) {
|
|
compression_enabled_ = false;
|
|
}
|
|
#endif //ROCKSDB_LITE
|
|
|
|
table_options.block_cache = uncompressed_cache_;
|
|
if (table_options.block_cache == nullptr) {
|
|
table_options.no_block_cache = true;
|
|
} else {
|
|
table_options.pin_l0_filter_and_index_blocks_in_cache = true;
|
|
}
|
|
table_options.block_cache_compressed = compressed_cache_;
|
|
table_options.flush_block_policy_factory.reset(
|
|
new MyFlushBlockPolicyFactory());
|
|
options.table_factory.reset(new BlockBasedTableFactory(table_options));
|
|
if (!compression_enabled_) {
|
|
options.compression = kNoCompression;
|
|
}
|
|
Reopen(options);
|
|
|
|
std::string zero_str(128, '\0');
|
|
for (int i = 0; i < 100; ++i) {
|
|
// Make the value compressible. A purely random string doesn't compress
|
|
// and the resultant data block will not be compressed
|
|
values_.emplace_back(RandomString(&rnd, 128) + zero_str);
|
|
assert(Put(Key(i), values_[i]) == Status::OK());
|
|
}
|
|
Flush();
|
|
|
|
for (int i = 0; i < 100; ++i) {
|
|
// block cannot gain space by compression
|
|
uncompressable_values_.emplace_back(RandomString(&rnd, 256) + '\0');
|
|
std::string tmp_key = "a" + Key(i);
|
|
assert(Put(tmp_key, uncompressable_values_[i]) == Status::OK());
|
|
}
|
|
Flush();
|
|
}
|
|
|
|
bool CheckValue(int i, const std::string& value) {
|
|
if (values_[i].compare(value) == 0) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
bool CheckUncompressableValue(int i, const std::string& value) {
|
|
if (uncompressable_values_[i].compare(value) == 0) {
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
int num_lookups() { return uncompressed_cache_->num_lookups(); }
|
|
int num_found() { return uncompressed_cache_->num_found(); }
|
|
int num_inserts() { return uncompressed_cache_->num_inserts(); }
|
|
|
|
int num_lookups_compressed() { return compressed_cache_->num_lookups(); }
|
|
int num_found_compressed() { return compressed_cache_->num_found(); }
|
|
int num_inserts_compressed() { return compressed_cache_->num_inserts(); }
|
|
|
|
bool fill_cache() { return fill_cache_; }
|
|
bool compression_enabled() { return compression_enabled_; }
|
|
bool has_compressed_cache() { return compressed_cache_ != nullptr; }
|
|
bool has_uncompressed_cache() { return uncompressed_cache_ != nullptr; }
|
|
|
|
static void SetUpTestCase() {}
|
|
static void TearDownTestCase() {}
|
|
|
|
private:
|
|
class MyFlushBlockPolicyFactory : public FlushBlockPolicyFactory {
|
|
public:
|
|
MyFlushBlockPolicyFactory() {}
|
|
|
|
virtual const char* Name() const override {
|
|
return "MyFlushBlockPolicyFactory";
|
|
}
|
|
|
|
virtual FlushBlockPolicy* NewFlushBlockPolicy(
|
|
const BlockBasedTableOptions& /*table_options*/,
|
|
const BlockBuilder& data_block_builder) const override {
|
|
return new MyFlushBlockPolicy(data_block_builder);
|
|
}
|
|
};
|
|
|
|
class MyFlushBlockPolicy : public FlushBlockPolicy {
|
|
public:
|
|
explicit MyFlushBlockPolicy(const BlockBuilder& data_block_builder)
|
|
: num_keys_(0), data_block_builder_(data_block_builder) {}
|
|
|
|
bool Update(const Slice& /*key*/, const Slice& /*value*/) override {
|
|
if (data_block_builder_.empty()) {
|
|
// First key in this block
|
|
num_keys_ = 1;
|
|
return false;
|
|
}
|
|
// Flush every 10 keys
|
|
if (num_keys_ == 10) {
|
|
num_keys_ = 1;
|
|
return true;
|
|
}
|
|
num_keys_++;
|
|
return false;
|
|
}
|
|
|
|
private:
|
|
int num_keys_;
|
|
const BlockBuilder& data_block_builder_;
|
|
};
|
|
|
|
class MyBlockCache : public Cache {
|
|
public:
|
|
explicit MyBlockCache(std::shared_ptr<Cache>& target)
|
|
: target_(target), num_lookups_(0), num_found_(0), num_inserts_(0) {}
|
|
|
|
virtual const char* Name() const override { return "MyBlockCache"; }
|
|
|
|
virtual Status Insert(const Slice& key, void* value, size_t charge,
|
|
void (*deleter)(const Slice& key, void* value),
|
|
Handle** handle = nullptr,
|
|
Priority priority = Priority::LOW) override {
|
|
num_inserts_++;
|
|
return target_->Insert(key, value, charge, deleter, handle, priority);
|
|
}
|
|
|
|
virtual Handle* Lookup(const Slice& key,
|
|
Statistics* stats = nullptr) override {
|
|
num_lookups_++;
|
|
Handle* handle = target_->Lookup(key, stats);
|
|
if (handle != nullptr) {
|
|
num_found_++;
|
|
}
|
|
return handle;
|
|
}
|
|
|
|
virtual bool Ref(Handle* handle) override { return target_->Ref(handle); }
|
|
|
|
virtual bool Release(Handle* handle, bool force_erase = false) override {
|
|
return target_->Release(handle, force_erase);
|
|
}
|
|
|
|
virtual void* Value(Handle* handle) override {
|
|
return target_->Value(handle);
|
|
}
|
|
|
|
virtual void Erase(const Slice& key) override { target_->Erase(key); }
|
|
virtual uint64_t NewId() override { return target_->NewId(); }
|
|
|
|
virtual void SetCapacity(size_t capacity) override {
|
|
target_->SetCapacity(capacity);
|
|
}
|
|
|
|
virtual void SetStrictCapacityLimit(bool strict_capacity_limit) override {
|
|
target_->SetStrictCapacityLimit(strict_capacity_limit);
|
|
}
|
|
|
|
virtual bool HasStrictCapacityLimit() const override {
|
|
return target_->HasStrictCapacityLimit();
|
|
}
|
|
|
|
virtual size_t GetCapacity() const override {
|
|
return target_->GetCapacity();
|
|
}
|
|
|
|
virtual size_t GetUsage() const override { return target_->GetUsage(); }
|
|
|
|
virtual size_t GetUsage(Handle* handle) const override {
|
|
return target_->GetUsage(handle);
|
|
}
|
|
|
|
virtual size_t GetPinnedUsage() const override {
|
|
return target_->GetPinnedUsage();
|
|
}
|
|
|
|
virtual size_t GetCharge(Handle* /*handle*/) const override { return 0; }
|
|
|
|
virtual void ApplyToAllCacheEntries(void (*callback)(void*, size_t),
|
|
bool thread_safe) override {
|
|
return target_->ApplyToAllCacheEntries(callback, thread_safe);
|
|
}
|
|
|
|
virtual void EraseUnRefEntries() override {
|
|
return target_->EraseUnRefEntries();
|
|
}
|
|
|
|
int num_lookups() { return num_lookups_; }
|
|
|
|
int num_found() { return num_found_; }
|
|
|
|
int num_inserts() { return num_inserts_; }
|
|
|
|
private:
|
|
std::shared_ptr<Cache> target_;
|
|
int num_lookups_;
|
|
int num_found_;
|
|
int num_inserts_;
|
|
};
|
|
|
|
std::shared_ptr<MyBlockCache> compressed_cache_;
|
|
std::shared_ptr<MyBlockCache> uncompressed_cache_;
|
|
bool compression_enabled_;
|
|
std::vector<std::string> values_;
|
|
std::vector<std::string> uncompressable_values_;
|
|
bool fill_cache_;
|
|
};
|
|
|
|
TEST_P(DBBasicTestWithParallelIO, MultiGet) {
|
|
std::vector<std::string> key_data(10);
|
|
std::vector<Slice> keys;
|
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
|
// largest we think we will need
|
|
std::vector<PinnableSlice> values(10);
|
|
std::vector<Status> statuses;
|
|
ReadOptions ro;
|
|
ro.fill_cache = fill_cache();
|
|
|
|
// Warm up the cache first
|
|
key_data.emplace_back(Key(0));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
key_data.emplace_back(Key(50));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
statuses.resize(keys.size());
|
|
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
|
|
ASSERT_TRUE(CheckValue(50, values[1].ToString()));
|
|
|
|
int random_reads = env_->random_read_counter_.Read();
|
|
key_data[0] = Key(1);
|
|
key_data[1] = Key(51);
|
|
keys[0] = Slice(key_data[0]);
|
|
keys[1] = Slice(key_data[1]);
|
|
values[0].Reset();
|
|
values[1].Reset();
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
ASSERT_TRUE(CheckValue(1, values[0].ToString()));
|
|
ASSERT_TRUE(CheckValue(51, values[1].ToString()));
|
|
|
|
bool read_from_cache = false;
|
|
if (fill_cache()) {
|
|
if (has_uncompressed_cache()) {
|
|
read_from_cache = true;
|
|
} else if (has_compressed_cache() && compression_enabled()) {
|
|
read_from_cache = true;
|
|
}
|
|
}
|
|
|
|
int expected_reads = random_reads + (read_from_cache ? 0 : 2);
|
|
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
|
|
|
keys.resize(10);
|
|
statuses.resize(10);
|
|
std::vector<int> key_ints{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
|
|
for (size_t i = 0; i < key_ints.size(); ++i) {
|
|
key_data[i] = Key(key_ints[i]);
|
|
keys[i] = Slice(key_data[i]);
|
|
statuses[i] = Status::OK();
|
|
values[i].Reset();
|
|
}
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
for (size_t i = 0; i < key_ints.size(); ++i) {
|
|
ASSERT_OK(statuses[i]);
|
|
ASSERT_TRUE(CheckValue(key_ints[i], values[i].ToString()));
|
|
}
|
|
if (compression_enabled() && !has_compressed_cache()) {
|
|
expected_reads += (read_from_cache ? 2 : 3);
|
|
} else {
|
|
expected_reads += (read_from_cache ? 2 : 4);
|
|
}
|
|
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
|
|
|
keys.resize(10);
|
|
statuses.resize(10);
|
|
std::vector<int> key_uncmp{1, 2, 15, 16, 55, 81, 82, 83, 84, 85};
|
|
for (size_t i = 0; i < key_uncmp.size(); ++i) {
|
|
key_data[i] = "a" + Key(key_uncmp[i]);
|
|
keys[i] = Slice(key_data[i]);
|
|
statuses[i] = Status::OK();
|
|
values[i].Reset();
|
|
}
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
for (size_t i = 0; i < key_uncmp.size(); ++i) {
|
|
ASSERT_OK(statuses[i]);
|
|
ASSERT_TRUE(CheckUncompressableValue(key_uncmp[i], values[i].ToString()));
|
|
}
|
|
if (compression_enabled() && !has_compressed_cache()) {
|
|
expected_reads += (read_from_cache ? 3 : 3);
|
|
} else {
|
|
expected_reads += (read_from_cache ? 4 : 4);
|
|
}
|
|
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
|
|
|
keys.resize(5);
|
|
statuses.resize(5);
|
|
std::vector<int> key_tr{1, 2, 15, 16, 55};
|
|
for (size_t i = 0; i < key_tr.size(); ++i) {
|
|
key_data[i] = "a" + Key(key_tr[i]);
|
|
keys[i] = Slice(key_data[i]);
|
|
statuses[i] = Status::OK();
|
|
values[i].Reset();
|
|
}
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
for (size_t i = 0; i < key_tr.size(); ++i) {
|
|
ASSERT_OK(statuses[i]);
|
|
ASSERT_TRUE(CheckUncompressableValue(key_tr[i], values[i].ToString()));
|
|
}
|
|
if (compression_enabled() && !has_compressed_cache()) {
|
|
expected_reads += (read_from_cache ? 0 : 2);
|
|
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
|
} else {
|
|
if (has_uncompressed_cache()) {
|
|
expected_reads += (read_from_cache ? 0 : 3);
|
|
ASSERT_EQ(env_->random_read_counter_.Read(), expected_reads);
|
|
} else {
|
|
// A rare case, even we enable the block compression but some of data
|
|
// blocks are not compressed due to content. If user only enable the
|
|
// compressed cache, the uncompressed blocks will not tbe cached, and
|
|
// block reads will be triggered. The number of reads is related to
|
|
// the compression algorithm.
|
|
ASSERT_TRUE(env_->random_read_counter_.Read() >= expected_reads);
|
|
}
|
|
}
|
|
}
|
|
|
|
TEST_P(DBBasicTestWithParallelIO, MultiGetWithChecksumMismatch) {
|
|
std::vector<std::string> key_data(10);
|
|
std::vector<Slice> keys;
|
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
|
// largest we think we will need
|
|
std::vector<PinnableSlice> values(10);
|
|
std::vector<Status> statuses;
|
|
int read_count = 0;
|
|
ReadOptions ro;
|
|
ro.fill_cache = fill_cache();
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"RetrieveMultipleBlocks:VerifyChecksum", [&](void *status) {
|
|
Status* s = static_cast<Status*>(status);
|
|
read_count++;
|
|
if (read_count == 2) {
|
|
*s = Status::Corruption();
|
|
}
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
// Warm up the cache first
|
|
key_data.emplace_back(Key(0));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
key_data.emplace_back(Key(50));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
statuses.resize(keys.size());
|
|
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
ASSERT_TRUE(CheckValue(0, values[0].ToString()));
|
|
//ASSERT_TRUE(CheckValue(50, values[1].ToString()));
|
|
ASSERT_EQ(statuses[0], Status::OK());
|
|
ASSERT_EQ(statuses[1], Status::Corruption());
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
TEST_P(DBBasicTestWithParallelIO, MultiGetWithMissingFile) {
|
|
std::vector<std::string> key_data(10);
|
|
std::vector<Slice> keys;
|
|
// We cannot resize a PinnableSlice vector, so just set initial size to
|
|
// largest we think we will need
|
|
std::vector<PinnableSlice> values(10);
|
|
std::vector<Status> statuses;
|
|
ReadOptions ro;
|
|
ro.fill_cache = fill_cache();
|
|
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"TableCache::MultiGet:FindTable", [&](void *status) {
|
|
Status* s = static_cast<Status*>(status);
|
|
*s = Status::IOError();
|
|
});
|
|
// DB open will create table readers unless we reduce the table cache
|
|
// capacity.
|
|
// SanitizeOptions will set max_open_files to minimum of 20. Table cache
|
|
// is allocated with max_open_files - 10 as capacity. So override
|
|
// max_open_files to 11 so table cache capacity will become 1. This will
|
|
// prevent file open during DB open and force the file to be opened
|
|
// during MultiGet
|
|
SyncPoint::GetInstance()->SetCallBack(
|
|
"SanitizeOptions::AfterChangeMaxOpenFiles", [&](void *arg) {
|
|
int* max_open_files = (int*)arg;
|
|
*max_open_files = 11;
|
|
});
|
|
SyncPoint::GetInstance()->EnableProcessing();
|
|
|
|
Reopen(CurrentOptions());
|
|
|
|
// Warm up the cache first
|
|
key_data.emplace_back(Key(0));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
key_data.emplace_back(Key(50));
|
|
keys.emplace_back(Slice(key_data.back()));
|
|
statuses.resize(keys.size());
|
|
|
|
dbfull()->MultiGet(ro, dbfull()->DefaultColumnFamily(), keys.size(),
|
|
keys.data(), values.data(), statuses.data(), true);
|
|
ASSERT_EQ(statuses[0], Status::IOError());
|
|
ASSERT_EQ(statuses[1], Status::IOError());
|
|
|
|
SyncPoint::GetInstance()->DisableProcessing();
|
|
}
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
ParallelIO, DBBasicTestWithParallelIO,
|
|
// Params are as follows -
|
|
// Param 0 - Compressed cache enabled
|
|
// Param 1 - Uncompressed cache enabled
|
|
// Param 2 - Data compression enabled
|
|
// Param 3 - ReadOptions::fill_cache
|
|
::testing::Combine(::testing::Bool(), ::testing::Bool(),
|
|
::testing::Bool(), ::testing::Bool()));
|
|
|
|
class DBBasicTestWithTimestampBase : public DBTestBase {
|
|
public:
|
|
explicit DBBasicTestWithTimestampBase(const std::string& dbname)
|
|
: DBTestBase(dbname) {}
|
|
|
|
protected:
|
|
static std::string Key1(uint64_t k) {
|
|
uint32_t x = 1;
|
|
const bool is_little_endian = (*reinterpret_cast<char*>(&x) != 0);
|
|
std::string ret;
|
|
if (is_little_endian) {
|
|
ret.assign(reinterpret_cast<char*>(&k), sizeof(k));
|
|
} else {
|
|
ret.resize(sizeof(k));
|
|
ret[0] = k & 0xff;
|
|
ret[1] = (k >> 8) & 0xff;
|
|
ret[2] = (k >> 16) & 0xff;
|
|
ret[3] = (k >> 24) & 0xff;
|
|
ret[4] = (k >> 32) & 0xff;
|
|
ret[5] = (k >> 40) & 0xff;
|
|
ret[6] = (k >> 48) & 0xff;
|
|
ret[7] = (k >> 56) & 0xff;
|
|
}
|
|
std::reverse(ret.begin(), ret.end());
|
|
return ret;
|
|
}
|
|
|
|
class TestComparator : public Comparator {
|
|
private:
|
|
const Comparator* cmp_without_ts_;
|
|
|
|
public:
|
|
explicit TestComparator(size_t ts_sz)
|
|
: Comparator(ts_sz), cmp_without_ts_(nullptr) {
|
|
cmp_without_ts_ = BytewiseComparator();
|
|
}
|
|
|
|
const char* Name() const override { return "TestComparator"; }
|
|
|
|
void FindShortSuccessor(std::string*) const override {}
|
|
|
|
void FindShortestSeparator(std::string*, const Slice&) const override {}
|
|
|
|
int Compare(const Slice& a, const Slice& b) const override {
|
|
int r = CompareWithoutTimestamp(a, b);
|
|
if (r != 0 || 0 == timestamp_size()) {
|
|
return r;
|
|
}
|
|
return -CompareTimestamp(
|
|
Slice(a.data() + a.size() - timestamp_size(), timestamp_size()),
|
|
Slice(b.data() + b.size() - timestamp_size(), timestamp_size()));
|
|
}
|
|
|
|
using Comparator::CompareWithoutTimestamp;
|
|
int CompareWithoutTimestamp(const Slice& a, bool a_has_ts, const Slice& b,
|
|
bool b_has_ts) const override {
|
|
if (a_has_ts) {
|
|
assert(a.size() >= timestamp_size());
|
|
}
|
|
if (b_has_ts) {
|
|
assert(b.size() >= timestamp_size());
|
|
}
|
|
Slice lhs = a_has_ts ? StripTimestampFromUserKey(a, timestamp_size()) : a;
|
|
Slice rhs = b_has_ts ? StripTimestampFromUserKey(b, timestamp_size()) : b;
|
|
return cmp_without_ts_->Compare(lhs, rhs);
|
|
}
|
|
|
|
int CompareTimestamp(const Slice& ts1, const Slice& ts2) const override {
|
|
if (!ts1.data() && !ts2.data()) {
|
|
return 0;
|
|
} else if (ts1.data() && !ts2.data()) {
|
|
return 1;
|
|
} else if (!ts1.data() && ts2.data()) {
|
|
return -1;
|
|
}
|
|
assert(ts1.size() == ts2.size());
|
|
uint64_t low1 = 0;
|
|
uint64_t low2 = 0;
|
|
uint64_t high1 = 0;
|
|
uint64_t high2 = 0;
|
|
const size_t kSize = ts1.size();
|
|
std::unique_ptr<char[]> ts1_buf(new char[kSize]);
|
|
memcpy(ts1_buf.get(), ts1.data(), ts1.size());
|
|
std::unique_ptr<char[]> ts2_buf(new char[kSize]);
|
|
memcpy(ts2_buf.get(), ts2.data(), ts2.size());
|
|
Slice ts1_copy = Slice(ts1_buf.get(), kSize);
|
|
Slice ts2_copy = Slice(ts2_buf.get(), kSize);
|
|
auto* ptr1 = const_cast<Slice*>(&ts1_copy);
|
|
auto* ptr2 = const_cast<Slice*>(&ts2_copy);
|
|
if (!GetFixed64(ptr1, &low1) || !GetFixed64(ptr1, &high1) ||
|
|
!GetFixed64(ptr2, &low2) || !GetFixed64(ptr2, &high2)) {
|
|
assert(false);
|
|
}
|
|
if (high1 < high2) {
|
|
return -1;
|
|
} else if (high1 > high2) {
|
|
return 1;
|
|
}
|
|
if (low1 < low2) {
|
|
return -1;
|
|
} else if (low1 > low2) {
|
|
return 1;
|
|
}
|
|
return 0;
|
|
}
|
|
};
|
|
|
|
std::string Timestamp(uint64_t low, uint64_t high) {
|
|
std::string ts;
|
|
PutFixed64(&ts, low);
|
|
PutFixed64(&ts, high);
|
|
return ts;
|
|
}
|
|
|
|
void CheckIterUserEntry(const Iterator* it, const Slice& expected_key,
|
|
const Slice& expected_value,
|
|
const Slice& expected_ts) const {
|
|
ASSERT_TRUE(it->Valid());
|
|
ASSERT_OK(it->status());
|
|
ASSERT_EQ(expected_key, it->key());
|
|
ASSERT_EQ(expected_value, it->value());
|
|
ASSERT_EQ(expected_ts, it->timestamp());
|
|
}
|
|
|
|
void CheckIterEntry(const Iterator* it, const Slice& expected_ukey,
|
|
SequenceNumber expected_seq, ValueType expected_val_type,
|
|
const Slice& expected_value, const Slice& expected_ts) {
|
|
ASSERT_TRUE(it->Valid());
|
|
ASSERT_OK(it->status());
|
|
std::string ukey_and_ts;
|
|
ukey_and_ts.assign(expected_ukey.data(), expected_ukey.size());
|
|
ukey_and_ts.append(expected_ts.data(), expected_ts.size());
|
|
ParsedInternalKey parsed_ikey(ukey_and_ts, expected_seq, expected_val_type);
|
|
std::string ikey;
|
|
AppendInternalKey(&ikey, parsed_ikey);
|
|
ASSERT_EQ(Slice(ikey), it->key());
|
|
if (expected_val_type == kTypeValue) {
|
|
ASSERT_EQ(expected_value, it->value());
|
|
}
|
|
ASSERT_EQ(expected_ts, it->timestamp());
|
|
}
|
|
};
|
|
|
|
class DBBasicTestWithTimestamp : public DBBasicTestWithTimestampBase {
|
|
public:
|
|
DBBasicTestWithTimestamp()
|
|
: DBBasicTestWithTimestampBase("db_basic_test_with_timestamp") {}
|
|
};
|
|
|
|
TEST_F(DBBasicTestWithTimestamp, SimpleForwardIterate) {
|
|
const int kNumKeysPerFile = 2048;
|
|
const uint64_t kMaxKey = 16384;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
// TODO(yanqin) re-enable auto compaction
|
|
options.disable_auto_compactions = true;
|
|
options.create_if_missing = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
|
DestroyAndReopen(options);
|
|
const std::vector<uint64_t> start_keys = {1, 0};
|
|
const std::vector<std::string> write_timestamps = {Timestamp(1, 0),
|
|
Timestamp(3, 0)};
|
|
const std::vector<std::string> read_timestamps = {Timestamp(2, 0),
|
|
Timestamp(4, 0)};
|
|
for (size_t i = 0; i < write_timestamps.size(); ++i) {
|
|
WriteOptions write_opts;
|
|
Slice write_ts = write_timestamps[i];
|
|
write_opts.timestamp = &write_ts;
|
|
for (uint64_t key = start_keys[i]; key <= kMaxKey; ++key) {
|
|
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
}
|
|
}
|
|
for (size_t i = 0; i < read_timestamps.size(); ++i) {
|
|
ReadOptions read_opts;
|
|
Slice read_ts = read_timestamps[i];
|
|
read_opts.timestamp = &read_ts;
|
|
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
|
|
int count = 0;
|
|
uint64_t key = 0;
|
|
for (it->Seek(Key1(0)), key = start_keys[i]; it->Valid();
|
|
it->Next(), ++count, ++key) {
|
|
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
|
|
write_timestamps[i]);
|
|
}
|
|
size_t expected_count = kMaxKey - start_keys[i] + 1;
|
|
ASSERT_EQ(expected_count, count);
|
|
|
|
// SeekToFirst() with lower bound.
|
|
// Then iter with lower and upper bounds.
|
|
uint64_t l = 0;
|
|
uint64_t r = kMaxKey + 1;
|
|
while (l < r) {
|
|
std::string lb_str = Key1(l);
|
|
Slice lb = lb_str;
|
|
std::string ub_str = Key1(r);
|
|
Slice ub = ub_str;
|
|
read_opts.iterate_lower_bound = &lb;
|
|
read_opts.iterate_upper_bound = &ub;
|
|
it.reset(db_->NewIterator(read_opts));
|
|
for (it->SeekToFirst(), key = std::max(l, start_keys[i]), count = 0;
|
|
it->Valid(); it->Next(), ++key, ++count) {
|
|
CheckIterUserEntry(it.get(), Key1(key), "value" + std::to_string(i),
|
|
write_timestamps[i]);
|
|
}
|
|
ASSERT_EQ(r - std::max(l, start_keys[i]), count);
|
|
l += (kMaxKey / 100);
|
|
r -= (kMaxKey / 100);
|
|
}
|
|
}
|
|
Close();
|
|
}
|
|
|
|
TEST_F(DBBasicTestWithTimestamp, ForwardIterateStartSeqnum) {
|
|
const int kNumKeysPerFile = 2048;
|
|
const uint64_t kMaxKey = 0xffffffffffffffff;
|
|
const uint64_t kMinKey = kMaxKey - 16383;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
// TODO(yanqin) re-enable auto compaction
|
|
options.disable_auto_compactions = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
|
DestroyAndReopen(options);
|
|
std::vector<SequenceNumber> start_seqs;
|
|
|
|
const int kNumTimestamps = 4;
|
|
std::vector<std::string> write_ts_list;
|
|
for (int t = 0; t != kNumTimestamps; ++t) {
|
|
write_ts_list.push_back(Timestamp(2 * t, /*do not care*/ 17));
|
|
}
|
|
WriteOptions write_opts;
|
|
for (size_t i = 0; i != write_ts_list.size(); ++i) {
|
|
Slice write_ts = write_ts_list[i];
|
|
write_opts.timestamp = &write_ts;
|
|
uint64_t k = kMinKey;
|
|
do {
|
|
Status s = db_->Put(write_opts, Key1(k), "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
if (k == kMaxKey) {
|
|
break;
|
|
}
|
|
++k;
|
|
} while (k != 0);
|
|
start_seqs.push_back(db_->GetLatestSequenceNumber());
|
|
}
|
|
std::vector<std::string> read_ts_list;
|
|
for (int t = 0; t != kNumTimestamps - 1; ++t) {
|
|
read_ts_list.push_back(Timestamp(2 * t + 3, /*do not care*/ 17));
|
|
}
|
|
ReadOptions read_opts;
|
|
for (size_t i = 0; i != read_ts_list.size(); ++i) {
|
|
Slice read_ts = read_ts_list[i];
|
|
read_opts.timestamp = &read_ts;
|
|
read_opts.iter_start_seqnum = start_seqs[i];
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
|
|
SequenceNumber expected_seq = start_seqs[i] + 1;
|
|
uint64_t key = kMinKey;
|
|
for (iter->Seek(Key1(kMinKey)); iter->Valid(); iter->Next()) {
|
|
CheckIterEntry(iter.get(), Key1(key), expected_seq, kTypeValue,
|
|
"value" + std::to_string(i + 1), write_ts_list[i + 1]);
|
|
++key;
|
|
++expected_seq;
|
|
}
|
|
}
|
|
Close();
|
|
}
|
|
|
|
TEST_F(DBBasicTestWithTimestamp, ReseekToTargetTimestamp) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
constexpr size_t kNumKeys = 16;
|
|
options.max_sequential_skip_in_iterations = kNumKeys / 2;
|
|
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
|
// TODO(yanqin) re-enable auto compaction
|
|
options.disable_auto_compactions = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
DestroyAndReopen(options);
|
|
// Insert kNumKeys
|
|
WriteOptions write_opts;
|
|
Status s;
|
|
for (size_t i = 0; i != kNumKeys; ++i) {
|
|
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
|
|
Slice ts = ts_str;
|
|
write_opts.timestamp = &ts;
|
|
s = db_->Put(write_opts, "foo", "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
}
|
|
{
|
|
ReadOptions read_opts;
|
|
std::string ts_str = Timestamp(1, 0);
|
|
Slice ts = ts_str;
|
|
read_opts.timestamp = &ts;
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
|
|
iter->SeekToFirst();
|
|
CheckIterUserEntry(iter.get(), "foo", "value0", ts_str);
|
|
ASSERT_EQ(
|
|
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
|
|
}
|
|
Close();
|
|
}
|
|
|
|
TEST_F(DBBasicTestWithTimestamp, ReseekToNextUserKey) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
constexpr size_t kNumKeys = 16;
|
|
options.max_sequential_skip_in_iterations = kNumKeys / 2;
|
|
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
|
|
// TODO(yanqin) re-enable auto compaction
|
|
options.disable_auto_compactions = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
DestroyAndReopen(options);
|
|
// Write kNumKeys + 1 keys
|
|
WriteOptions write_opts;
|
|
Status s;
|
|
for (size_t i = 0; i != kNumKeys; ++i) {
|
|
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
|
|
Slice ts = ts_str;
|
|
write_opts.timestamp = &ts;
|
|
s = db_->Put(write_opts, "a", "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
}
|
|
{
|
|
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
|
|
WriteBatch batch(0, 0, kTimestampSize);
|
|
batch.Put("a", "new_value");
|
|
batch.Put("b", "new_value");
|
|
s = batch.AssignTimestamp(ts_str);
|
|
ASSERT_OK(s);
|
|
s = db_->Write(write_opts, &batch);
|
|
ASSERT_OK(s);
|
|
}
|
|
{
|
|
ReadOptions read_opts;
|
|
std::string ts_str = Timestamp(static_cast<uint64_t>(kNumKeys + 1), 0);
|
|
Slice ts = ts_str;
|
|
read_opts.timestamp = &ts;
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
|
|
iter->Seek("a");
|
|
iter->Next();
|
|
CheckIterUserEntry(iter.get(), "b", "new_value", ts_str);
|
|
ASSERT_EQ(
|
|
1, options.statistics->getTickerCount(NUMBER_OF_RESEEKS_IN_ITERATION));
|
|
}
|
|
Close();
|
|
}
|
|
|
|
TEST_F(DBBasicTestWithTimestamp, MaxKeysSkipped) {
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
DestroyAndReopen(options);
|
|
constexpr size_t max_skippable_internal_keys = 2;
|
|
const size_t kNumKeys = max_skippable_internal_keys + 2;
|
|
WriteOptions write_opts;
|
|
Status s;
|
|
{
|
|
std::string ts_str = Timestamp(1, 0);
|
|
Slice ts = ts_str;
|
|
write_opts.timestamp = &ts;
|
|
ASSERT_OK(db_->Put(write_opts, "a", "value"));
|
|
}
|
|
for (size_t i = 0; i < kNumKeys; ++i) {
|
|
std::string ts_str = Timestamp(static_cast<uint64_t>(i + 1), 0);
|
|
Slice ts = ts_str;
|
|
write_opts.timestamp = &ts;
|
|
s = db_->Put(write_opts, "b", "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
}
|
|
{
|
|
ReadOptions read_opts;
|
|
read_opts.max_skippable_internal_keys = max_skippable_internal_keys;
|
|
std::string ts_str = Timestamp(1, 0);
|
|
Slice ts = ts_str;
|
|
read_opts.timestamp = &ts;
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
|
|
iter->SeekToFirst();
|
|
iter->Next();
|
|
ASSERT_TRUE(iter->status().IsIncomplete());
|
|
}
|
|
Close();
|
|
}
|
|
|
|
class DBBasicTestWithTimestampCompressionSettings
|
|
: public DBBasicTestWithTimestampBase,
|
|
public testing::WithParamInterface<std::tuple<
|
|
std::shared_ptr<const FilterPolicy>, CompressionType, uint32_t>> {
|
|
public:
|
|
DBBasicTestWithTimestampCompressionSettings()
|
|
: DBBasicTestWithTimestampBase(
|
|
"db_basic_test_with_timestamp_compression") {}
|
|
};
|
|
|
|
TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGet) {
|
|
const int kNumKeysPerFile = 8192;
|
|
const size_t kNumTimestamps = 6;
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.env = env_;
|
|
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
|
size_t ts_sz = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(ts_sz);
|
|
options.comparator = &test_cmp;
|
|
BlockBasedTableOptions bbto;
|
|
bbto.filter_policy = std::get<0>(GetParam());
|
|
bbto.whole_key_filtering = true;
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
const CompressionType comp_type = std::get<1>(GetParam());
|
|
#if LZ4_VERSION_NUMBER < 10400 // r124+
|
|
if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
|
|
return;
|
|
}
|
|
#endif // LZ4_VERSION_NUMBER >= 10400
|
|
if (!ZSTD_Supported() && comp_type == kZSTD) {
|
|
return;
|
|
}
|
|
if (!Zlib_Supported() && comp_type == kZlibCompression) {
|
|
return;
|
|
}
|
|
|
|
options.compression = comp_type;
|
|
options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
|
|
if (comp_type == kZSTD) {
|
|
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
|
|
}
|
|
options.target_file_size_base = 1 << 26; // 64MB
|
|
DestroyAndReopen(options);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
size_t num_cfs = handles_.size();
|
|
ASSERT_EQ(2, num_cfs);
|
|
std::vector<std::string> write_ts_list;
|
|
std::vector<std::string> read_ts_list;
|
|
|
|
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
|
write_ts_list.push_back(Timestamp(i * 2, 0));
|
|
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
|
|
const Slice write_ts = write_ts_list.back();
|
|
WriteOptions wopts;
|
|
wopts.timestamp = &write_ts;
|
|
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
|
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
|
|
ASSERT_OK(Put(cf, Key1(j),
|
|
"value_" + std::to_string(j) + "_" + std::to_string(i),
|
|
wopts));
|
|
}
|
|
}
|
|
}
|
|
const auto& verify_db_func = [&]() {
|
|
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
|
ReadOptions ropts;
|
|
const Slice read_ts = read_ts_list[i];
|
|
ropts.timestamp = &read_ts;
|
|
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
|
ColumnFamilyHandle* cfh = handles_[cf];
|
|
for (size_t j = 0; j != (kNumKeysPerFile - 1) / kNumTimestamps; ++j) {
|
|
std::string value;
|
|
ASSERT_OK(db_->Get(ropts, cfh, Key1(j), &value));
|
|
ASSERT_EQ("value_" + std::to_string(j) + "_" + std::to_string(i),
|
|
value);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
verify_db_func();
|
|
Close();
|
|
}
|
|
|
|
#ifndef ROCKSDB_LITE
|
|
// A class which remembers the name of each flushed file.
|
|
class FlushedFileCollector : public EventListener {
|
|
public:
|
|
FlushedFileCollector() {}
|
|
~FlushedFileCollector() override {}
|
|
|
|
void OnFlushCompleted(DB* /*db*/, const FlushJobInfo& info) override {
|
|
InstrumentedMutexLock lock(&mutex_);
|
|
flushed_files_.push_back(info.file_path);
|
|
}
|
|
|
|
std::vector<std::string> GetFlushedFiles() {
|
|
std::vector<std::string> result;
|
|
{
|
|
InstrumentedMutexLock lock(&mutex_);
|
|
result = flushed_files_;
|
|
}
|
|
return result;
|
|
}
|
|
|
|
void ClearFlushedFiles() {
|
|
InstrumentedMutexLock lock(&mutex_);
|
|
flushed_files_.clear();
|
|
}
|
|
|
|
private:
|
|
std::vector<std::string> flushed_files_;
|
|
InstrumentedMutex mutex_;
|
|
};
|
|
|
|
TEST_P(DBBasicTestWithTimestampCompressionSettings, PutAndGetWithCompaction) {
|
|
const int kNumKeysPerFile = 8192;
|
|
const size_t kNumTimestamps = 2;
|
|
const size_t kNumKeysPerTimestamp = (kNumKeysPerFile - 1) / kNumTimestamps;
|
|
const size_t kSplitPosBase = kNumKeysPerTimestamp / 2;
|
|
Options options = CurrentOptions();
|
|
options.create_if_missing = true;
|
|
options.env = env_;
|
|
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
|
|
|
FlushedFileCollector* collector = new FlushedFileCollector();
|
|
options.listeners.emplace_back(collector);
|
|
|
|
size_t ts_sz = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(ts_sz);
|
|
options.comparator = &test_cmp;
|
|
BlockBasedTableOptions bbto;
|
|
bbto.filter_policy = std::get<0>(GetParam());
|
|
bbto.whole_key_filtering = true;
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
|
|
const CompressionType comp_type = std::get<1>(GetParam());
|
|
#if LZ4_VERSION_NUMBER < 10400 // r124+
|
|
if (comp_type == kLZ4Compression || comp_type == kLZ4HCCompression) {
|
|
return;
|
|
}
|
|
#endif // LZ4_VERSION_NUMBER >= 10400
|
|
if (!ZSTD_Supported() && comp_type == kZSTD) {
|
|
return;
|
|
}
|
|
if (!Zlib_Supported() && comp_type == kZlibCompression) {
|
|
return;
|
|
}
|
|
|
|
options.compression = comp_type;
|
|
options.compression_opts.max_dict_bytes = std::get<2>(GetParam());
|
|
if (comp_type == kZSTD) {
|
|
options.compression_opts.zstd_max_train_bytes = std::get<2>(GetParam());
|
|
}
|
|
DestroyAndReopen(options);
|
|
CreateAndReopenWithCF({"pikachu"}, options);
|
|
|
|
size_t num_cfs = handles_.size();
|
|
ASSERT_EQ(2, num_cfs);
|
|
std::vector<std::string> write_ts_list;
|
|
std::vector<std::string> read_ts_list;
|
|
|
|
const auto& verify_record_func = [&](size_t i, size_t k,
|
|
ColumnFamilyHandle* cfh) {
|
|
std::string value;
|
|
std::string timestamp;
|
|
|
|
ReadOptions ropts;
|
|
const Slice read_ts = read_ts_list[i];
|
|
ropts.timestamp = &read_ts;
|
|
std::string expected_timestamp =
|
|
std::string(write_ts_list[i].data(), write_ts_list[i].size());
|
|
|
|
ASSERT_OK(db_->Get(ropts, cfh, Key1(k), &value, ×tamp));
|
|
ASSERT_EQ("value_" + std::to_string(k) + "_" + std::to_string(i), value);
|
|
ASSERT_EQ(expected_timestamp, timestamp);
|
|
};
|
|
|
|
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
|
write_ts_list.push_back(Timestamp(i * 2, 0));
|
|
read_ts_list.push_back(Timestamp(1 + i * 2, 0));
|
|
const Slice write_ts = write_ts_list.back();
|
|
WriteOptions wopts;
|
|
wopts.timestamp = &write_ts;
|
|
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
|
size_t memtable_get_start = 0;
|
|
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
|
|
ASSERT_OK(Put(cf, Key1(j),
|
|
"value_" + std::to_string(j) + "_" + std::to_string(i),
|
|
wopts));
|
|
if (j == kSplitPosBase + i || j == kNumKeysPerTimestamp - 1) {
|
|
for (size_t k = memtable_get_start; k <= j; ++k) {
|
|
verify_record_func(i, k, handles_[cf]);
|
|
}
|
|
memtable_get_start = j + 1;
|
|
|
|
// flush all keys with the same timestamp to two sst files, split at
|
|
// incremental positions such that lowerlevel[1].smallest.userkey ==
|
|
// higherlevel[0].largest.userkey
|
|
ASSERT_OK(Flush(cf));
|
|
|
|
// compact files (2 at each level) to a lower level such that all keys
|
|
// with the same timestamp is at one level, with newer versions at
|
|
// higher levels.
|
|
CompactionOptions compact_opt;
|
|
compact_opt.compression = kNoCompression;
|
|
db_->CompactFiles(compact_opt, handles_[cf],
|
|
collector->GetFlushedFiles(),
|
|
static_cast<int>(kNumTimestamps - i));
|
|
collector->ClearFlushedFiles();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
const auto& verify_db_func = [&]() {
|
|
for (size_t i = 0; i != kNumTimestamps; ++i) {
|
|
ReadOptions ropts;
|
|
const Slice read_ts = read_ts_list[i];
|
|
ropts.timestamp = &read_ts;
|
|
std::string expected_timestamp(write_ts_list[i].data(),
|
|
write_ts_list[i].size());
|
|
for (int cf = 0; cf != static_cast<int>(num_cfs); ++cf) {
|
|
ColumnFamilyHandle* cfh = handles_[cf];
|
|
for (size_t j = 0; j != kNumKeysPerTimestamp; ++j) {
|
|
verify_record_func(i, j, cfh);
|
|
}
|
|
}
|
|
}
|
|
};
|
|
verify_db_func();
|
|
Close();
|
|
}
|
|
#endif // !ROCKSDB_LITE
|
|
|
|
INSTANTIATE_TEST_CASE_P(
|
|
Timestamp, DBBasicTestWithTimestampCompressionSettings,
|
|
::testing::Combine(
|
|
::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
|
|
std::shared_ptr<const FilterPolicy>(
|
|
NewBloomFilterPolicy(10, false))),
|
|
::testing::Values(kNoCompression, kZlibCompression, kLZ4Compression,
|
|
kLZ4HCCompression, kZSTD),
|
|
::testing::Values(0, 1 << 14)));
|
|
|
|
class DBBasicTestWithTimestampPrefixSeek
|
|
: public DBBasicTestWithTimestampBase,
|
|
public testing::WithParamInterface<
|
|
std::tuple<std::shared_ptr<const SliceTransform>,
|
|
std::shared_ptr<const FilterPolicy>, bool>> {
|
|
public:
|
|
DBBasicTestWithTimestampPrefixSeek()
|
|
: DBBasicTestWithTimestampBase(
|
|
"/db_basic_test_with_timestamp_prefix_seek") {}
|
|
};
|
|
|
|
TEST_P(DBBasicTestWithTimestampPrefixSeek, ForwardIterateWithPrefix) {
|
|
const size_t kNumKeysPerFile = 4096;
|
|
Options options = CurrentOptions();
|
|
options.env = env_;
|
|
options.create_if_missing = true;
|
|
// TODO(yanqin): re-enable auto compactions
|
|
options.disable_auto_compactions = true;
|
|
const size_t kTimestampSize = Timestamp(0, 0).size();
|
|
TestComparator test_cmp(kTimestampSize);
|
|
options.comparator = &test_cmp;
|
|
options.prefix_extractor = std::get<0>(GetParam());
|
|
options.memtable_factory.reset(new SpecialSkipListFactory(kNumKeysPerFile));
|
|
BlockBasedTableOptions bbto;
|
|
bbto.filter_policy = std::get<1>(GetParam());
|
|
options.table_factory.reset(NewBlockBasedTableFactory(bbto));
|
|
DestroyAndReopen(options);
|
|
|
|
const uint64_t kMaxKey = 0xffffffffffffffff;
|
|
const uint64_t kMinKey = 0xffffffffffff8000;
|
|
const std::vector<std::string> write_ts_list = {Timestamp(3, 0xffffffff),
|
|
Timestamp(6, 0xffffffff)};
|
|
WriteOptions write_opts;
|
|
{
|
|
for (size_t i = 0; i != write_ts_list.size(); ++i) {
|
|
Slice write_ts = write_ts_list[i];
|
|
write_opts.timestamp = &write_ts;
|
|
uint64_t key = kMinKey;
|
|
do {
|
|
Status s = db_->Put(write_opts, Key1(key), "value" + std::to_string(i));
|
|
ASSERT_OK(s);
|
|
if (key == kMaxKey) {
|
|
break;
|
|
}
|
|
++key;
|
|
} while (true);
|
|
}
|
|
}
|
|
const std::vector<std::string> read_ts_list = {Timestamp(5, 0xffffffff),
|
|
Timestamp(9, 0xffffffff)};
|
|
{
|
|
ReadOptions read_opts;
|
|
read_opts.total_order_seek = false;
|
|
read_opts.prefix_same_as_start = std::get<2>(GetParam());
|
|
fprintf(stdout, "%s %s %d\n", options.prefix_extractor->Name(),
|
|
bbto.filter_policy ? bbto.filter_policy->Name() : "null",
|
|
static_cast<int>(read_opts.prefix_same_as_start));
|
|
for (size_t i = 0; i != read_ts_list.size(); ++i) {
|
|
Slice read_ts = read_ts_list[i];
|
|
read_opts.timestamp = &read_ts;
|
|
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts));
|
|
|
|
// Seek to kMaxKey
|
|
iter->Seek(Key1(kMaxKey));
|
|
CheckIterUserEntry(iter.get(), Key1(kMaxKey), "value" + std::to_string(i),
|
|
write_ts_list[i]);
|
|
iter->Next();
|
|
ASSERT_FALSE(iter->Valid());
|
|
}
|
|
const std::vector<uint64_t> targets = {kMinKey, kMinKey + 0x10,
|
|
kMinKey + 0x100, kMaxKey};
|
|
const SliceTransform* const pe = options.prefix_extractor.get();
|
|
ASSERT_NE(nullptr, pe);
|
|
const size_t kPrefixShift =
|
|
8 * (Key1(0).size() - pe->Transform(Key1(0)).size());
|
|
const uint64_t kPrefixMask =
|
|
~((static_cast<uint64_t>(1) << kPrefixShift) - 1);
|
|
const uint64_t kNumKeysWithinPrefix =
|
|
(static_cast<uint64_t>(1) << kPrefixShift);
|
|
for (size_t i = 0; i != read_ts_list.size(); ++i) {
|
|
Slice read_ts = read_ts_list[i];
|
|
read_opts.timestamp = &read_ts;
|
|
std::unique_ptr<Iterator> it(db_->NewIterator(read_opts));
|
|
for (size_t j = 0; j != targets.size(); ++j) {
|
|
std::string start_key = Key1(targets[j]);
|
|
uint64_t expected_ub =
|
|
(targets[j] & kPrefixMask) - 1 + kNumKeysWithinPrefix;
|
|
uint64_t expected_key = targets[j];
|
|
size_t count = 0;
|
|
it->Seek(Key1(targets[j]));
|
|
while (it->Valid()) {
|
|
std::string saved_prev_key;
|
|
saved_prev_key.assign(it->key().data(), it->key().size());
|
|
|
|
// Out of prefix
|
|
if (!read_opts.prefix_same_as_start &&
|
|
pe->Transform(saved_prev_key) != pe->Transform(start_key)) {
|
|
break;
|
|
}
|
|
CheckIterUserEntry(it.get(), Key1(expected_key),
|
|
"value" + std::to_string(i), write_ts_list[i]);
|
|
++count;
|
|
++expected_key;
|
|
it->Next();
|
|
}
|
|
ASSERT_EQ(expected_ub - targets[j] + 1, count);
|
|
}
|
|
}
|
|
}
|
|
Close();
|
|
}
|
|
|
|
// TODO(yanqin): consider handling non-fixed-length prefix extractors, e.g.
|
|
// NoopTransform.
|
|
INSTANTIATE_TEST_CASE_P(
|
|
Timestamp, DBBasicTestWithTimestampPrefixSeek,
|
|
::testing::Combine(
|
|
::testing::Values(
|
|
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(4)),
|
|
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(7)),
|
|
std::shared_ptr<const SliceTransform>(NewFixedPrefixTransform(8))),
|
|
::testing::Values(std::shared_ptr<const FilterPolicy>(nullptr),
|
|
std::shared_ptr<const FilterPolicy>(
|
|
NewBloomFilterPolicy(10 /*bits_per_key*/, false)),
|
|
std::shared_ptr<const FilterPolicy>(
|
|
NewBloomFilterPolicy(20 /*bits_per_key*/,
|
|
false))),
|
|
::testing::Bool()));
|
|
|
|
} // namespace ROCKSDB_NAMESPACE
|
|
|
|
#ifdef ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
|
extern "C" {
|
|
void RegisterCustomObjects(int argc, char** argv);
|
|
}
|
|
#else
|
|
void RegisterCustomObjects(int /*argc*/, char** /*argv*/) {}
|
|
#endif // !ROCKSDB_UNITTESTS_WITH_CUSTOM_OBJECTS_FROM_STATIC_LIBS
|
|
|
|
int main(int argc, char** argv) {
|
|
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
RegisterCustomObjects(argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|