Added bytes XOR merge operator
Summary: Closes https://github.com/facebook/rocksdb/pull/575 I fixed the merge conflicts etc. Closes https://github.com/facebook/rocksdb/pull/3065 Differential Revision: D7128233 Pulled By: sagar0 fbshipit-source-id: 2c23a48c9f0432c290b0cd16a12fb691bb37820c
This commit is contained in:
parent
62277e15c3
commit
0a2354ca8f
@ -602,6 +602,7 @@ set(SOURCES
|
||||
utilities/leveldb_options/leveldb_options.cc
|
||||
utilities/lua/rocks_lua_compaction_filter.cc
|
||||
utilities/memory/memory_util.cc
|
||||
utilities/merge_operators/bytesxor.cc
|
||||
utilities/merge_operators/max.cc
|
||||
utilities/merge_operators/put.cc
|
||||
utilities/merge_operators/string_append/stringappend.cc
|
||||
|
1
TARGETS
1
TARGETS
@ -242,6 +242,7 @@ cpp_library(
|
||||
"utilities/leveldb_options/leveldb_options.cc",
|
||||
"utilities/lua/rocks_lua_compaction_filter.cc",
|
||||
"utilities/memory/memory_util.cc",
|
||||
"utilities/merge_operators/bytesxor.cc",
|
||||
"utilities/merge_operators/max.cc",
|
||||
"utilities/merge_operators/put.cc",
|
||||
"utilities/merge_operators/string_append/stringappend.cc",
|
||||
|
1
src.mk
1
src.mk
@ -182,6 +182,7 @@ LIB_SOURCES = \
|
||||
utilities/merge_operators/string_append/stringappend.cc \
|
||||
utilities/merge_operators/string_append/stringappend2.cc \
|
||||
utilities/merge_operators/uint64add.cc \
|
||||
utilities/merge_operators/bytesxor.cc \
|
||||
utilities/option_change_migration/option_change_migration.cc \
|
||||
utilities/options/options_util.cc \
|
||||
utilities/persistent_cache/block_cache_tier.cc \
|
||||
|
@ -70,6 +70,7 @@
|
||||
#include "util/xxhash.h"
|
||||
#include "utilities/blob_db/blob_db.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
#include "utilities/merge_operators/bytesxor.h"
|
||||
#include "utilities/persistent_cache/block_cache_tier.h"
|
||||
|
||||
#ifdef OS_WIN
|
||||
@ -107,6 +108,7 @@ DEFINE_string(
|
||||
"readwhilemerging,"
|
||||
"readrandomwriterandom,"
|
||||
"updaterandom,"
|
||||
"xorupdaterandom,"
|
||||
"randomwithverify,"
|
||||
"fill100K,"
|
||||
"crc32c,"
|
||||
@ -151,6 +153,8 @@ DEFINE_string(
|
||||
"\tprefixscanrandom -- prefix scan N times in random order\n"
|
||||
"\tupdaterandom -- N threads doing read-modify-write for random "
|
||||
"keys\n"
|
||||
"\txorupdaterandom -- N threads doing read-XOR-write for "
|
||||
"random keys\n"
|
||||
"\tappendrandom -- N threads doing read-modify-write with "
|
||||
"growing values\n"
|
||||
"\tmergerandom -- same as updaterandom/appendrandom using merge"
|
||||
@ -2526,6 +2530,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
method = &Benchmark::ReadRandomMergeRandom;
|
||||
} else if (name == "updaterandom") {
|
||||
method = &Benchmark::UpdateRandom;
|
||||
} else if (name == "xorupdaterandom") {
|
||||
method = &Benchmark::XORUpdateRandom;
|
||||
} else if (name == "appendrandom") {
|
||||
method = &Benchmark::AppendRandom;
|
||||
} else if (name == "mergerandom") {
|
||||
@ -4743,6 +4749,58 @@ void VerifyDBFromDB(std::string& truth_db_name) {
|
||||
thread->stats.AddMessage(msg);
|
||||
}
|
||||
|
||||
// Read-XOR-write for random keys. Xors the existing value with a randomly
|
||||
// generated value, and stores the result. Assuming A in the array of bytes
|
||||
// representing the existing value, we generate an array B of the same size,
|
||||
// then compute C = A^B as C[i]=A[i]^B[i], and store C
|
||||
void XORUpdateRandom(ThreadState* thread) {
|
||||
ReadOptions options(FLAGS_verify_checksum, true);
|
||||
RandomGenerator gen;
|
||||
std::string existing_value;
|
||||
int64_t found = 0;
|
||||
Duration duration(FLAGS_duration, readwrites_);
|
||||
|
||||
BytesXOROperator xor_operator;
|
||||
|
||||
std::unique_ptr<const char[]> key_guard;
|
||||
Slice key = AllocateKey(&key_guard);
|
||||
// the number of iterations is the larger of read_ or write_
|
||||
while (!duration.Done(1)) {
|
||||
DB* db = SelectDB(thread);
|
||||
GenerateKeyFromInt(thread->rand.Next() % FLAGS_num, FLAGS_num, &key);
|
||||
|
||||
auto status = db->Get(options, key, &existing_value);
|
||||
if (status.ok()) {
|
||||
++found;
|
||||
} else if (!status.IsNotFound()) {
|
||||
fprintf(stderr, "Get returned an error: %s\n",
|
||||
status.ToString().c_str());
|
||||
exit(1);
|
||||
}
|
||||
|
||||
Slice value = gen.Generate(value_size_);
|
||||
std::string new_value;
|
||||
|
||||
if (status.ok()) {
|
||||
Slice existing_value_slice = Slice(existing_value);
|
||||
xor_operator.XOR(&existing_value_slice, value, &new_value);
|
||||
} else {
|
||||
xor_operator.XOR(nullptr, value, &new_value);
|
||||
}
|
||||
|
||||
Status s = db->Put(write_options_, key, Slice(new_value));
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "put error: %s\n", s.ToString().c_str());
|
||||
exit(1);
|
||||
}
|
||||
thread->stats.FinishedOps(nullptr, db, 1);
|
||||
}
|
||||
char msg[100];
|
||||
snprintf(msg, sizeof(msg),
|
||||
"( updates:%" PRIu64 " found:%" PRIu64 ")", readwrites_, found);
|
||||
thread->stats.AddMessage(msg);
|
||||
}
|
||||
|
||||
// Read-modify-write for random keys.
|
||||
// Each operation causes the key grow by value_size (simulating an append).
|
||||
// Generally used for benchmarking against merges of similar type
|
||||
|
@ -3,13 +3,13 @@
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
//
|
||||
#ifndef MERGE_OPERATORS_H
|
||||
#define MERGE_OPERATORS_H
|
||||
#pragma once
|
||||
#include "rocksdb/merge_operator.h"
|
||||
|
||||
#include <memory>
|
||||
#include <stdio.h>
|
||||
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
@ -21,6 +21,7 @@ class MergeOperators {
|
||||
static std::shared_ptr<MergeOperator> CreateStringAppendOperator();
|
||||
static std::shared_ptr<MergeOperator> CreateStringAppendTESTOperator();
|
||||
static std::shared_ptr<MergeOperator> CreateMaxOperator();
|
||||
static std::shared_ptr<MergeOperator> CreateBytesXOROperator();
|
||||
|
||||
// Will return a different merge operator depending on the string.
|
||||
// TODO: Hook the "name" up to the actual Name() of the MergeOperators?
|
||||
@ -38,14 +39,13 @@ class MergeOperators {
|
||||
return CreateStringAppendTESTOperator();
|
||||
} else if (name == "max") {
|
||||
return CreateMaxOperator();
|
||||
} else if (name == "bytesxor") {
|
||||
return CreateBytesXOROperator();
|
||||
} else {
|
||||
// Empty or unknown, just return nullptr
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif
|
||||
|
59
utilities/merge_operators/bytesxor.cc
Normal file
59
utilities/merge_operators/bytesxor.cc
Normal file
@ -0,0 +1,59 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
|
||||
#include "utilities/merge_operators/bytesxor.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
std::shared_ptr<MergeOperator> MergeOperators::CreateBytesXOROperator() {
|
||||
return std::make_shared<BytesXOROperator>();
|
||||
}
|
||||
|
||||
bool BytesXOROperator::Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const Slice& value,
|
||||
std::string* new_value,
|
||||
Logger* logger) const {
|
||||
XOR(existing_value, value, new_value);
|
||||
return true;
|
||||
}
|
||||
|
||||
void BytesXOROperator::XOR(const Slice* existing_value,
|
||||
const Slice& value, std::string* new_value) const {
|
||||
if (!existing_value) {
|
||||
new_value->clear();
|
||||
new_value->assign(value.data(), value.size());
|
||||
return;
|
||||
}
|
||||
|
||||
size_t min_size = std::min(existing_value->size(), value.size());
|
||||
size_t max_size = std::max(existing_value->size(), value.size());
|
||||
|
||||
new_value->clear();
|
||||
new_value->reserve(max_size);
|
||||
|
||||
const char* existing_value_data = existing_value->data();
|
||||
const char* value_data = value.data();
|
||||
|
||||
for (size_t i = 0; i < min_size; i++) {
|
||||
new_value->push_back(existing_value_data[i] ^ value_data[i]);
|
||||
}
|
||||
|
||||
if (existing_value->size() == max_size) {
|
||||
for (size_t i = min_size; i < max_size; i++) {
|
||||
new_value->push_back(existing_value_data[i]);
|
||||
}
|
||||
} else {
|
||||
assert(value.size() == max_size);
|
||||
for (size_t i = min_size; i < max_size; i++) {
|
||||
new_value->push_back(value_data[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rocksdb
|
42
utilities/merge_operators/bytesxor.h
Normal file
42
utilities/merge_operators/bytesxor.h
Normal file
@ -0,0 +1,42 @@
|
||||
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under both the GPLv2 (found in the
|
||||
// COPYING file in the root directory) and Apache 2.0 License
|
||||
// (found in the LICENSE.Apache file in the root directory).
|
||||
|
||||
#ifndef UTILITIES_MERGE_OPERATORS_BYTESXOR_H_
|
||||
#define UTILITIES_MERGE_OPERATORS_BYTESXOR_H_
|
||||
|
||||
#include <algorithm>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include "rocksdb/env.h"
|
||||
#include "rocksdb/merge_operator.h"
|
||||
#include "rocksdb/slice.h"
|
||||
#include "util/coding.h"
|
||||
#include "utilities/merge_operators.h"
|
||||
|
||||
namespace rocksdb {
|
||||
|
||||
// A 'model' merge operator that XORs two (same sized) array of bytes.
|
||||
// Implemented as an AssociativeMergeOperator for simplicity and example.
|
||||
class BytesXOROperator : public AssociativeMergeOperator {
|
||||
public:
|
||||
// XORs the two array of bytes one byte at a time and stores the result
|
||||
// in new_value. len is the number of xored bytes, and the length of new_value
|
||||
virtual bool Merge(const Slice& key,
|
||||
const Slice* existing_value,
|
||||
const Slice& value,
|
||||
std::string* new_value,
|
||||
Logger* logger) const override;
|
||||
|
||||
virtual const char* Name() const override {
|
||||
return "BytesXOR";
|
||||
}
|
||||
|
||||
void XOR(const Slice* existing_value, const Slice& value,
|
||||
std::string* new_value) const;
|
||||
};
|
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
#endif // UTILITIES_MERGE_OPERATORS_BYTESXOR_H_
|
Loading…
Reference in New Issue
Block a user