Support GetAllKeyVersions() for non-default cf (#5544)

Summary:
Previously `GetAllKeyVersions()` supports default column family only. This PR add support for other column families.

Test plan (devserver):
```
$make clean && COMPILE_WITH_ASAN=1 make -j32 db_basic_test
$./db_basic_test --gtest_filter=DBBasicTest.GetAllKeyVersions
```
All other unit tests must pass.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/5544

Differential Revision: D16147551

Pulled By: riversand963

fbshipit-source-id: 5a61aece2a32d789e150226a9b8d53f4a5760168
This commit is contained in:
Yanqin Jin 2019-07-07 22:40:52 -07:00 committed by Facebook Github Bot
parent 8d34806972
commit 7c76a7fba2
4 changed files with 77 additions and 4 deletions

View File

@ -15,6 +15,7 @@
* Add C bindings for secondary instance, i.e. DBImplSecondary.
* db_bench adds a "benchmark" stats_history, which prints out the whole stats history.
* Rate limited deletion of WALs is only enabled if DBOptions::wal_dir is not set, or explicitly set to db_name passed to DB::Open and DBOptions::db_paths is empty, or same as db_paths[0].path
* Overload GetAllKeyVersions() to support non-default column family.
### New Features
* Add an option `snap_refresh_nanos` (default to 0.1s) to periodically refresh the snapshot list in compaction jobs. Assign to 0 to disable the feature.

View File

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/perf_context.h"
#include "rocksdb/utilities/debug.h"
#include "table/block_based/block_builder.h"
#include "test_util/fault_injection_test_env.h"
#if !defined(ROCKSDB_LITE)
@ -1286,6 +1287,55 @@ TEST_F(DBBasicTest, MultiGetBatchedMultiLevel) {
}
}
#ifndef ROCKSDB_LITE
TEST_F(DBBasicTest, GetAllKeyVersions) {
Options options = CurrentOptions();
options.env = env_;
options.create_if_missing = true;
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
ASSERT_EQ(2, handles_.size());
const size_t kNumInserts = 4;
const size_t kNumDeletes = 4;
const size_t kNumUpdates = 4;
// Check default column family
for (size_t i = 0; i != kNumInserts; ++i) {
ASSERT_OK(Put(std::to_string(i), "value"));
}
for (size_t i = 0; i != kNumUpdates; ++i) {
ASSERT_OK(Put(std::to_string(i), "value1"));
}
for (size_t i = 0; i != kNumDeletes; ++i) {
ASSERT_OK(Delete(std::to_string(i)));
}
std::vector<KeyVersion> key_versions;
ASSERT_OK(rocksdb::GetAllKeyVersions(db_, Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
ASSERT_OK(rocksdb::GetAllKeyVersions(db_, handles_[0], Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates, key_versions.size());
// Check non-default column family
for (size_t i = 0; i != kNumInserts - 1; ++i) {
ASSERT_OK(Put(1, std::to_string(i), "value"));
}
for (size_t i = 0; i != kNumUpdates - 1; ++i) {
ASSERT_OK(Put(1, std::to_string(i), "value1"));
}
for (size_t i = 0; i != kNumDeletes - 1; ++i) {
ASSERT_OK(Delete(1, std::to_string(i)));
}
ASSERT_OK(rocksdb::GetAllKeyVersions(db_, handles_[1], Slice(), Slice(),
std::numeric_limits<size_t>::max(),
&key_versions));
ASSERT_EQ(kNumInserts + kNumDeletes + kNumUpdates - 3, key_versions.size());
}
#endif // !ROCKSDB_LITE
class DBBasicTestWithParallelIO
: public DBTestBase,
public testing::WithParamInterface<std::tuple<bool,bool,bool,bool>> {

View File

@ -40,6 +40,10 @@ Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
size_t max_num_ikeys,
std::vector<KeyVersion>* key_versions);
Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
Slice end_key, size_t max_num_ikeys,
std::vector<KeyVersion>* key_versions);
} // namespace rocksdb
#endif // ROCKSDB_LITE

View File

@ -14,16 +14,34 @@ namespace rocksdb {
Status GetAllKeyVersions(DB* db, Slice begin_key, Slice end_key,
size_t max_num_ikeys,
std::vector<KeyVersion>* key_versions) {
assert(key_versions != nullptr);
if (nullptr == db) {
return Status::InvalidArgument("db cannot be null.");
}
return GetAllKeyVersions(db, db->DefaultColumnFamily(), begin_key, end_key,
max_num_ikeys, key_versions);
}
Status GetAllKeyVersions(DB* db, ColumnFamilyHandle* cfh, Slice begin_key,
Slice end_key, size_t max_num_ikeys,
std::vector<KeyVersion>* key_versions) {
if (nullptr == db) {
return Status::InvalidArgument("db cannot be null.");
}
if (nullptr == cfh) {
return Status::InvalidArgument("Column family handle cannot be null.");
}
if (nullptr == key_versions) {
return Status::InvalidArgument("key_versions cannot be null.");
}
key_versions->clear();
DBImpl* idb = static_cast<DBImpl*>(db->GetRootDB());
auto icmp = InternalKeyComparator(idb->GetOptions().comparator);
auto icmp = InternalKeyComparator(idb->GetOptions(cfh).comparator);
ReadRangeDelAggregator range_del_agg(&icmp,
kMaxSequenceNumber /* upper_bound */);
Arena arena;
ScopedArenaIterator iter(
idb->NewInternalIterator(&arena, &range_del_agg, kMaxSequenceNumber));
ScopedArenaIterator iter(idb->NewInternalIterator(&arena, &range_del_agg,
kMaxSequenceNumber, cfh));
if (!begin_key.empty()) {
InternalKey ikey;