Merge pull request #230 from cockroachdb/spencerkimball/send-user-keys-to-v2-filter

Pass parsed user key to prefix extractor in V2 compaction
This commit is contained in:
Igor Canadi 2014-08-18 11:09:30 -04:00
commit c8ecfaedd0
3 changed files with 36 additions and 9 deletions

View File

@ -13,6 +13,7 @@
* Statistics APIs now take uint32_t as type instead of Tickers. Also make two access functions getTickerCount and histogramData const
* Add DB property rocksdb.estimate-num-keys, estimated number of live keys in DB.
* Add DB::GetIntProperty(), which returns DB properties that are integer as uint64_t.
* The Prefix Extractor used with V2 compaction filters is now passed user key to SliceTransform::Transform instead of unparsed RocksDB key.
## 3.3.0 (7/10/2014)
### New Features

View File

@ -261,6 +261,29 @@ static void CompactionFilterV2Filter(
}
}
// Custom prefix extractor for compaction filter V2 which extracts first 3 characters.
static void CFV2PrefixExtractorDestroy(void* arg) { }
static char* CFV2PrefixExtractorTransform(void* arg, const char* key, size_t length, size_t* dst_length) {
// Verify keys are maximum length 4; this verifies fix for a
// prior bug which was passing the RocksDB-encoded key with
// logical timestamp suffix instead of parsed user key.
if (length > 4) {
fprintf(stderr, "%s:%d: %s: key %s is not user key\n", __FILE__, __LINE__, phase, key);
abort();
}
*dst_length = length < 3 ? length : 3;
return (char*)key;
}
static unsigned char CFV2PrefixExtractorInDomain(void* state, const char* key, size_t length) {
return 1;
}
static unsigned char CFV2PrefixExtractorInRange(void* state, const char* key, size_t length) {
return 1;
}
static const char* CFV2PrefixExtractorName(void* state) {
return "TestCFV2PrefixExtractor";
}
// Custom compaction filter factory V2.
static void CompactionFilterFactoryV2Destroy(void* arg) {
rocksdb_slicetransform_destroy((rocksdb_slicetransform_t*)arg);
@ -585,9 +608,12 @@ int main(int argc, char** argv) {
{
rocksdb_compactionfilterfactoryv2_t* factory;
rocksdb_slicetransform_t* prefix_extractor;
prefix_extractor = rocksdb_slicetransform_create_fixed_prefix(3);
prefix_extractor = rocksdb_slicetransform_create(
NULL, CFV2PrefixExtractorDestroy, CFV2PrefixExtractorTransform,
CFV2PrefixExtractorInDomain, CFV2PrefixExtractorInRange,
CFV2PrefixExtractorName);
factory = rocksdb_compactionfilterfactoryv2_create(
prefix_extractor, prefix_extractor, CompactionFilterFactoryV2Destroy,
NULL, prefix_extractor, CompactionFilterFactoryV2Destroy,
CompactionFilterFactoryV2Create, CompactionFilterFactoryV2Name);
// Create new database
rocksdb_close(db);

View File

@ -3038,19 +3038,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
Slice key = backup_input->key();
Slice value = backup_input->value();
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(options_.info_log, "[%s] Failed to parse key: %s",
cfd->GetName().c_str(), key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing