From cb8a7302e4a2124beb9fe404c26ff76b7ce9b803 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 23 Oct 2013 14:38:52 -0700 Subject: [PATCH] Implement max_size in BlobStore Summary: I added max_size option in blobstore. Since we now know the maximum number of buckets we'll ever use, we can allocate an array of buckets and access its elements without use of any locks! Common case Get doesn't lock anything now. Benchmarks on 16KB block size show no impact on speed, though. Test Plan: unittests + benchmark Reviewers: dhruba, haobo, kailiu Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D13641 --- util/blob_store.cc | 41 +++++++++++++++++++++++++---------------- util/blob_store.h | 15 ++++++++++++--- util/blob_store_test.cc | 28 ++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 19 deletions(-) diff --git a/util/blob_store.cc b/util/blob_store.cc index a30d48e66..f1ad620e9 100644 --- a/util/blob_store.cc +++ b/util/blob_store.cc @@ -106,21 +106,27 @@ bool FreeList::Overlap(const Blob &blob) const { BlobStore::BlobStore(const string& directory, uint64_t block_size, uint32_t blocks_per_bucket, + uint32_t max_buckets, Env* env) : directory_(directory), block_size_(block_size), blocks_per_bucket_(blocks_per_bucket), - env_(env) { + env_(env), + max_buckets_(max_buckets) { env_->CreateDirIfMissing(directory_); storage_options_.use_mmap_writes = false; storage_options_.use_mmap_reads = false; + buckets_size_ = 0; + buckets_ = new unique_ptr[max_buckets_]; + CreateNewBucket(); } BlobStore::~BlobStore() { // TODO we don't care about recovery for now + delete [] buckets_; } Status BlobStore::Put(const Slice& value, Blob* blob) { @@ -129,13 +135,12 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { if (!s.ok()) { return s; } - ReadLock l(&buckets_mutex_); size_t size_left = value.size(); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob->chunks) { uint64_t write_size = min(chunk.size * block_size_, size_left); - assert(chunk.bucket_id < buckets_.size()); + assert(chunk.bucket_id < buckets_size_); s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_, Slice(value.data() + offset, write_size)); @@ -164,18 +169,19 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { Status BlobStore::Get(const Blob& blob, string* value) const { - ReadLock l(&buckets_mutex_); - - // assert that it doesn't overlap with free list - // it will get compiled out for release - assert(!free_list_.Overlap(blob)); + { + // assert that it doesn't overlap with free list + // it will get compiled out for release + MutexLock l(&free_list_mutex_); + assert(!free_list_.Overlap(blob)); + } value->resize(blob.Size() * block_size_); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob.chunks) { Slice result; - assert(chunk.bucket_id < buckets_.size()); + assert(chunk.bucket_id < buckets_size_); Status s; s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_, chunk.size * block_size_, @@ -200,8 +206,7 @@ Status BlobStore::Delete(const Blob& blob) { } Status BlobStore::Sync() { - ReadLock l(&buckets_mutex_); - for (size_t i = 0; i < buckets_.size(); ++i) { + for (size_t i = 0; i < buckets_size_; ++i) { Status s = buckets_[i].get()->Sync(); if (!s.ok()) { return s; @@ -228,10 +233,13 @@ Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { // called with free_list_mutex_ held Status BlobStore::CreateNewBucket() { - WriteLock l(&buckets_mutex_); - int new_bucket_id; - new_bucket_id = buckets_.size(); - buckets_.push_back(unique_ptr()); + MutexLock l(&buckets_mutex_); + + if (buckets_size_ >= max_buckets_) { + return Status::IOError("Max size exceeded\n"); + } + + int new_bucket_id = buckets_size_; char fname[200]; sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id); @@ -240,7 +248,6 @@ Status BlobStore::CreateNewBucket() { &buckets_[new_bucket_id], storage_options_); if (!s.ok()) { - buckets_.erase(buckets_.begin() + new_bucket_id); return s; } @@ -249,6 +256,8 @@ Status BlobStore::CreateNewBucket() { // (also, tmpfs does not support allocate) buckets_[new_bucket_id].get()->Allocate(0, block_size_ * blocks_per_bucket_); + buckets_size_ = new_bucket_id + 1; + return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_)); } diff --git a/util/blob_store.h b/util/blob_store.h index be9947216..0a81d01df 100644 --- a/util/blob_store.h +++ b/util/blob_store.h @@ -102,10 +102,14 @@ class BlobStore { // Bucket is a device or a file that we use to store the blobs. // If we don't have enough blocks to allocate a new blob, we will // try to create a new file or device. + // max_buckets - maximum number of buckets BlobStore will create + // BlobStore max size in bytes is + // max_buckets * blocks_per_bucket * block_size // env - env for creating new files BlobStore(const std::string& directory, uint64_t block_size, uint32_t blocks_per_bucket, + uint32_t max_buckets, Env* env); ~BlobStore(); @@ -134,10 +138,15 @@ class BlobStore { EnvOptions storage_options_; // protected by free_list_mutex_ FreeList free_list_; + // free_list_mutex_ is locked BEFORE buckets_mutex_ mutable port::Mutex free_list_mutex_; - // protected by buckets mutex - std::vector> buckets_; - mutable port::RWMutex buckets_mutex_; + // protected by buckets_mutex_ + // array of buckets + unique_ptr* buckets_; + // number of buckets in the array + uint32_t buckets_size_; + uint32_t max_buckets_; + mutable port::Mutex buckets_mutex_; // Calls FreeList allocate. If free list can't allocate // new blob, creates new bucket and tries again diff --git a/util/blob_store_test.cc b/util/blob_store_test.cc index 0983faece..bc4900795 100644 --- a/util/blob_store_test.cc +++ b/util/blob_store_test.cc @@ -37,6 +37,7 @@ TEST(BlobStoreTest, SanityTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 1000, Env::Default()); string buf; @@ -98,6 +99,7 @@ TEST(BlobStoreTest, FragmentedChunksTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 1000, Env::Default()); string buf; @@ -138,6 +140,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 10000, Env::Default()); vector> ranges; @@ -165,6 +168,31 @@ TEST(BlobStoreTest, CreateAndStoreTest) { ASSERT_OK(blob_store.Sync()); } +TEST(BlobStoreTest, MaxSizeTest) { + const uint64_t block_size = 10; + const uint32_t blocks_per_file = 100; + const int max_buckets = 10; + Random random(5); + + BlobStore blob_store(test::TmpDir() + "/blob_store_test", + block_size, + blocks_per_file, + max_buckets, + Env::Default()); + string buf; + for (int i = 0; i < max_buckets; ++i) { + test::RandomString(&random, 1000, &buf); + Blob r; + ASSERT_OK(blob_store.Put(Slice(buf), &r)); + } + + test::RandomString(&random, 1000, &buf); + Blob r; + // should fail because max size + Status s = blob_store.Put(Slice(buf), &r); + ASSERT_EQ(s.ok(), false); +} + } // namespace rocksdb int main(int argc, char** argv) {