External Value Store

Summary:
Developing a capability for storing values on external backing file(s).

This is just a highly unoptimized first pass - supports:
1) Allocating some portion of external file to be used to store value
2) Freeing the range, enabling it to be reused by other values

As next steps, I plan to:
1) Create some kind of stress testing. Once I can measure stuff, I can focus on optimizing.
2) Optimize locking.
3) Optimize freelist data structure. Currently we have O(n) for both freeing and allocation.
4) Figure out how to do recovery.

Test Plan: Created a unit test.

Reviewers: dhruba, haobo, kailiu

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13389
This commit is contained in:
Igor Canadi 2013-10-16 17:33:49 -07:00
parent 0f31843c15
commit fc4616d898
4 changed files with 585 additions and 0 deletions

View File

@ -48,6 +48,7 @@ TESTS = \
db_test \
dbformat_test \
env_test \
blob_store_test \
filelock_test \
filename_test \
filter_block_test \
@ -204,6 +205,9 @@ cache_test: util/cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
coding_test: util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/coding_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
blob_store_test: util/blob_store_test.o $(LIBOBJECTS) $(TESTHARNESS) $(TESTUTIL)
$(CXX) util/blob_store_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o$@ $(LDFLAGS) $(COVERAGEFLAGS)
stringappend_test: utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) utilities/merge_operators/string_append/stringappend_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)

297
util/blob_store.cc Normal file
View File

@ -0,0 +1,297 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/blob_store.h"
namespace rocksdb {
using namespace std;
// BlobChunk
bool BlobChunk::ImmediatelyBefore(const BlobChunk& chunk) const {
// overlapping!?
assert(!Overlap(chunk));
// size == 0 is a marker, not a block
return size != 0 &&
bucket_id == chunk.bucket_id &&
offset + size == chunk.offset;
}
bool BlobChunk::Overlap(const BlobChunk &chunk) const {
return size != 0 && chunk.size != 0 && bucket_id == chunk.bucket_id &&
((offset >= chunk.offset && offset < chunk.offset + chunk.size) ||
(chunk.offset >= offset && chunk.offset < offset + size));
}
// Blob
string Blob::ToString() const {
string ret;
for (auto chunk : chunks) {
PutFixed32(&ret, chunk.bucket_id);
PutFixed32(&ret, chunk.offset);
PutFixed32(&ret, chunk.size);
}
return ret;
}
Blob::Blob(const std::string& blob) {
for (uint32_t i = 0; i < blob.size(); ) {
uint32_t t[3] = {0};
for (int j = 0; j < 3 && i + sizeof(uint32_t) - 1 < blob.size();
++j, i += sizeof(uint32_t)) {
t[j] = DecodeFixed32(blob.data() + i);
}
chunks.push_back(BlobChunk(t[0], t[1], t[2]));
}
}
// FreeList
FreeList::FreeList() {
// We add (0, 0, 0) blob because it makes our life easier and
// code cleaner. (0, 0, 0) is always in the list so we can
// guarantee that free_chunks_list_ != nullptr, which avoids
// lots of unnecessary ifs
free_chunks_list_ = (FreeChunk *)malloc(sizeof(FreeChunk));
free_chunks_list_->chunk = BlobChunk(0, 0, 0);
free_chunks_list_->next = nullptr;
}
FreeList::~FreeList() {
while (free_chunks_list_ != nullptr) {
FreeChunk* t = free_chunks_list_;
free_chunks_list_ = free_chunks_list_->next;
free(t);
}
}
Status FreeList::Free(const Blob& blob) {
MutexLock l(&mutex_);
// add it back to the free list
for (auto chunk : blob.chunks) {
FreeChunk* itr = free_chunks_list_;
// find a node AFTER which we'll add the block
for ( ; itr->next != nullptr && itr->next->chunk <= chunk;
itr = itr->next) {
}
// try to merge with previous block
if (itr->chunk.ImmediatelyBefore(chunk)) {
// merge
itr->chunk.size += chunk.size;
} else {
// Insert the block after itr
FreeChunk* t = (FreeChunk*)malloc(sizeof(FreeChunk));
if (t == nullptr) {
throw runtime_error("Malloc failed");
}
t->chunk = chunk;
t->next = itr->next;
itr->next = t;
itr = t;
}
// try to merge with the next block
if (itr->next != nullptr &&
itr->chunk.ImmediatelyBefore(itr->next->chunk)) {
FreeChunk *tobedeleted = itr->next;
itr->chunk.size += itr->next->chunk.size;
itr->next = itr->next->next;
free(tobedeleted);
}
}
return Status::OK();
}
Status FreeList::Allocate(uint32_t blocks, Blob* blob) {
MutexLock l(&mutex_);
FreeChunk** best_fit_node = nullptr;
// Find the smallest free chunk whose size is greater or equal to blocks
for (FreeChunk** itr = &free_chunks_list_; (*itr) != nullptr;
itr = &((*itr)->next)) {
if ((*itr)->chunk.size >= blocks &&
(best_fit_node == nullptr ||
(*best_fit_node)->chunk.size > (*itr)->chunk.size)) {
best_fit_node = itr;
}
}
if (best_fit_node == nullptr || *best_fit_node == nullptr) {
// Not enough memory
return Status::Incomplete("");
}
blob->SetOneChunk((*best_fit_node)->chunk.bucket_id,
(*best_fit_node)->chunk.offset,
blocks);
if ((*best_fit_node)->chunk.size > blocks) {
// just shorten best_fit_node
(*best_fit_node)->chunk.offset += blocks;
(*best_fit_node)->chunk.size -= blocks;
} else {
assert(blocks == (*best_fit_node)->chunk.size);
// delete best_fit_node
FreeChunk* t = *best_fit_node;
(*best_fit_node) = (*best_fit_node)->next;
free(t);
}
return Status::OK();
}
bool FreeList::Overlap(const Blob &blob) const {
MutexLock l(&mutex_);
for (auto chunk : blob.chunks) {
for (auto itr = free_chunks_list_; itr != nullptr; itr = itr->next) {
if (itr->chunk.Overlap(chunk)) {
return true;
}
}
}
return false;
}
// BlobStore
BlobStore::BlobStore(const string& directory,
uint64_t block_size,
uint32_t blocks_per_bucket,
Env* env) :
directory_(directory),
block_size_(block_size),
blocks_per_bucket_(blocks_per_bucket),
env_(env) {
env_->CreateDirIfMissing(directory_);
storage_options_.use_mmap_writes = false;
storage_options_.use_mmap_reads = false;
CreateNewBucket();
}
BlobStore::~BlobStore() {
// TODO we don't care about recovery for now
}
Status BlobStore::Put(const char* value, uint64_t size, Blob* blob) {
// convert size to number of blocks
Status s = Allocate((size + block_size_ - 1) / block_size_, blob);
if (!s.ok()) {
return s;
}
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob->chunks) {
uint64_t write_size = min(chunk.size * block_size_, size);
assert(chunk.bucket_id < buckets_.size());
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_,
Slice(value + offset,
write_size));
if (!s.ok()) {
Delete(*blob);
return s;
}
offset += write_size;
size -= write_size;
if (write_size < chunk.size * block_size_) {
// if we have any space left in the block, fill it up with zeros
string zero_string(chunk.size * block_size_ - write_size, 0);
s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_ +
write_size,
Slice(zero_string));
}
}
if (size > 0) {
Delete(*blob);
return Status::IOError("Tried to write more data than fits in the blob");
}
return Status::OK();
}
Status BlobStore::Get(const Blob& blob,
string* value) const {
// assert that it doesn't overlap with free list
// it will get compiled out for release
assert(!free_list_.Overlap(blob));
uint32_t total_size = 0; // in blocks
for (auto chunk : blob.chunks) {
total_size += chunk.size;
}
assert(total_size > 0);
value->resize(total_size * block_size_);
uint64_t offset = 0; // in bytes, not blocks
for (auto chunk : blob.chunks) {
Slice result;
assert(chunk.bucket_id < buckets_.size());
Status s;
s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_,
chunk.size * block_size_,
&result,
&value->at(offset));
if (!s.ok() || result.size() < chunk.size * block_size_) {
value->clear();
return Status::IOError("Could not read in from file");
}
offset += chunk.size * block_size_;
}
// remove the '\0's at the end of the string
value->erase(find(value->begin(), value->end(), '\0'), value->end());
return Status::OK();
}
Status BlobStore::Delete(const Blob& blob) {
return free_list_.Free(blob);
}
Status BlobStore::Allocate(uint32_t blocks, Blob* blob) {
// TODO we don't currently support fragmented blobs
MutexLock l(&allocate_mutex_);
assert(blocks <= blocks_per_bucket_);
Status s;
s = free_list_.Allocate(blocks, blob);
if (!s.ok()) {
CreateNewBucket();
s = free_list_.Allocate(blocks, blob);
}
return s;
}
Status BlobStore::CreateNewBucket() {
int new_bucket_id;
new_bucket_id = buckets_.size();
buckets_.push_back(unique_ptr<RandomRWFile>());
char fname[200];
sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id);
Status s = env_->NewRandomRWFile(string(fname),
&buckets_[new_bucket_id],
storage_options_);
if (!s.ok()) {
buckets_.erase(buckets_.begin() + new_bucket_id);
return s;
}
s = buckets_[new_bucket_id].get()->Allocate(
0, block_size_ * blocks_per_bucket_);
if (!s.ok()) {
buckets_.erase(buckets_.begin() + new_bucket_id);
return s;
}
return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_));
}
} // namespace rocksdb

151
util/blob_store.h Normal file
View File

@ -0,0 +1,151 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "port/port.h"
#include "util/mutexlock.h"
#include "util/coding.h"
#include <list>
#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <algorithm>
#include <cstdio>
namespace rocksdb {
struct BlobChunk {
uint32_t bucket_id;
uint32_t offset; // in blocks
uint32_t size; // in blocks
BlobChunk() {}
BlobChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) :
bucket_id(bucket_id), offset(offset), size(size) {}
bool operator <= (const BlobChunk& chunk) const {
if (bucket_id != chunk.bucket_id) {
return bucket_id < chunk.bucket_id;
}
if (offset != chunk.offset) {
return offset < chunk.offset;
}
return true;
}
// returns true if it's immediately before chunk
bool ImmediatelyBefore(const BlobChunk& chunk) const;
// returns true if chunks overlap
bool Overlap(const BlobChunk &chunk) const;
};
// We represent each Blob as a string in format:
// bucket_id offset size|bucket_id offset size...
// The string can be used to reference the Blob stored on external
// device/file
// Not thread-safe!
struct Blob {
// Generates the string
std::string ToString() const;
// Parses the previously generated string
explicit Blob(const std::string& blob);
// Creates unfragmented Blob
Blob(uint32_t bucket_id, uint32_t offset, uint32_t size) {
SetOneChunk(bucket_id, offset, size);
}
Blob() {}
void SetOneChunk(uint32_t bucket_id, uint32_t offset, uint32_t size) {
chunks.clear();
chunks.push_back(BlobChunk(bucket_id, offset, size));
}
// bucket_id, offset, size
std::vector<BlobChunk> chunks;
};
// Keeps a list of free chunks
class FreeList {
public:
FreeList ();
~FreeList();
// Allocates a a blob. Stores the allocated blob in
// 'blob'. Returns non-OK status if it failed to allocate.
// Thread-safe
Status Allocate(uint32_t blocks, Blob* blob);
// Frees the blob for reuse. Thread-safe
Status Free(const Blob& blob);
// returns true if blob is overlapping with any of the
// chunks stored in free list
bool Overlap(const Blob &blob) const;
private:
struct FreeChunk {
BlobChunk chunk;
FreeChunk* next;
};
FreeChunk* free_chunks_list_;
mutable port::Mutex mutex_;
};
// thread-safe
class BlobStore {
public:
// directory - wherever the blobs should be stored. It will be created
// if missing
// block_size - self explanatory
// blocks_per_bucket - how many blocks we want to keep in one bucket.
// Bucket is a device or a file that we use to store the blobs.
// If we don't have enough blocks to allocate a new blob, we will
// try to create a new file or device.
// env - env for creating new files
BlobStore(const std::string& directory,
uint64_t block_size,
uint32_t blocks_per_bucket,
Env* env);
~BlobStore();
// Allocates space for size bytes (rounded up to be multiple of
// block size) and writes size bytes from value to a backing store.
// Sets Blob blob that can than be used for addressing the
// stored value. Returns non-OK status on error.
Status Put(const char* value, uint64_t size, Blob* blob);
// Value needs to have enough space to store all the loaded stuff.
// This function is thread safe!
Status Get(const Blob& blob, std::string* value) const;
// Frees the blob for reuse, but does not delete the data
// on the backing store.
Status Delete(const Blob& blob);
private:
const std::string directory_;
// block_size_ is uint64_t because when we multiply with
// blocks_size_ we want the result to be uint64_t or
// we risk overflowing
const uint64_t block_size_;
const uint32_t blocks_per_bucket_;
Env* env_;
EnvOptions storage_options_;
FreeList free_list_;
// protected by buckets mutex
std::vector<unique_ptr<RandomRWFile>> buckets_;
mutable port::Mutex allocate_mutex_;
// Calls FreeList allocate. If free list can't allocate
// new blob, creates new bucket and tries again
// Thread-safe
Status Allocate(uint32_t blocks, Blob* blob);
// Creates a new backing store and adds all the blocks
// from the new backing store to the free list
// NOT thread-safe, call with lock held
Status CreateNewBucket();
};
} // namespace rocksdb

133
util/blob_store_test.cc Normal file
View File

@ -0,0 +1,133 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "util/blob_store.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "util/random.h"
#include <cstdlib>
#include <string>
namespace rocksdb {
using namespace std;
class BlobStoreTest { };
TEST(BlobStoreTest, RangeParseTest) {
Blob e;
for (int i = 0; i < 5; ++i) {
e.chunks.push_back(BlobChunk(rand(), rand(), rand()));
}
string x = e.ToString();
Blob nx(x);
ASSERT_EQ(nx.ToString(), x);
}
// make sure we're reusing the freed space
TEST(BlobStoreTest, SanityTest) {
const uint64_t block_size = 10;
const uint32_t blocks_per_file = 20;
Random random(5);
BlobStore blob_store(test::TmpDir() + "/blob_store_test",
block_size,
blocks_per_file,
Env::Default());
string buf;
// put string of size 170
test::RandomString(&random, 170, &buf);
Blob r1;
ASSERT_OK(blob_store.Put(buf.data(), 170, &r1));
// use the first file
for (size_t i = 0; i < r1.chunks.size(); ++i) {
ASSERT_EQ(r1.chunks[0].bucket_id, 0u);
}
// put string of size 30
test::RandomString(&random, 30, &buf);
Blob r2;
ASSERT_OK(blob_store.Put(buf.data(), 30, &r2));
// use the first file
for (size_t i = 0; i < r2.chunks.size(); ++i) {
ASSERT_EQ(r2.chunks[0].bucket_id, 0u);
}
// delete blob of size 170
ASSERT_OK(blob_store.Delete(r1));
// put a string of size 100
test::RandomString(&random, 100, &buf);
Blob r3;
ASSERT_OK(blob_store.Put(buf.data(), 100, &r3));
// use the first file
for (size_t i = 0; i < r3.chunks.size(); ++i) {
ASSERT_EQ(r3.chunks[0].bucket_id, 0u);
}
// put a string of size 70
test::RandomString(&random, 70, &buf);
Blob r4;
ASSERT_OK(blob_store.Put(buf.data(), 70, &r4));
// use the first file
for (size_t i = 0; i < r4.chunks.size(); ++i) {
ASSERT_EQ(r4.chunks[0].bucket_id, 0u);
}
// put a string of size 5
test::RandomString(&random, 5, &buf);
Blob r5;
ASSERT_OK(blob_store.Put(buf.data(), 70, &r5));
// now you get to use the second file
for (size_t i = 0; i < r5.chunks.size(); ++i) {
ASSERT_EQ(r5.chunks[0].bucket_id, 1u);
}
}
TEST(BlobStoreTest, CreateAndStoreTest) {
const uint64_t block_size = 10;
const uint32_t blocks_per_file = 1000;
const int max_blurb_size = 300;
Random random(5);
BlobStore blob_store(test::TmpDir() + "/blob_store_test",
block_size,
blocks_per_file,
Env::Default());
vector<pair<Blob, string>> ranges;
for (int i = 0; i < 20000; ++i) {
int decision = rand() % 5;
if (decision <= 2 || ranges.size() == 0) {
string buf;
int size_blocks = (rand() % max_blurb_size + 1);
int string_size = size_blocks * block_size - (rand() % block_size);
test::RandomString(&random, string_size, &buf);
Blob r;
ASSERT_OK(blob_store.Put(buf.data(), string_size, &r));
ranges.push_back(make_pair(r, buf));
} else if (decision == 3) {
int ti = rand() % ranges.size();
string out_buf;
ASSERT_OK(blob_store.Get(ranges[ti].first, &out_buf));
ASSERT_EQ(ranges[ti].second, out_buf);
} else {
int ti = rand() % ranges.size();
ASSERT_OK(blob_store.Delete(ranges[ti].first));
ranges.erase(ranges.begin() + ti);
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}