diff --git a/util/blob_store.cc b/util/blob_store.cc index a30d48e66..f1ad620e9 100644 --- a/util/blob_store.cc +++ b/util/blob_store.cc @@ -106,21 +106,27 @@ bool FreeList::Overlap(const Blob &blob) const { BlobStore::BlobStore(const string& directory, uint64_t block_size, uint32_t blocks_per_bucket, + uint32_t max_buckets, Env* env) : directory_(directory), block_size_(block_size), blocks_per_bucket_(blocks_per_bucket), - env_(env) { + env_(env), + max_buckets_(max_buckets) { env_->CreateDirIfMissing(directory_); storage_options_.use_mmap_writes = false; storage_options_.use_mmap_reads = false; + buckets_size_ = 0; + buckets_ = new unique_ptr[max_buckets_]; + CreateNewBucket(); } BlobStore::~BlobStore() { // TODO we don't care about recovery for now + delete [] buckets_; } Status BlobStore::Put(const Slice& value, Blob* blob) { @@ -129,13 +135,12 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { if (!s.ok()) { return s; } - ReadLock l(&buckets_mutex_); size_t size_left = value.size(); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob->chunks) { uint64_t write_size = min(chunk.size * block_size_, size_left); - assert(chunk.bucket_id < buckets_.size()); + assert(chunk.bucket_id < buckets_size_); s = buckets_[chunk.bucket_id].get()->Write(chunk.offset * block_size_, Slice(value.data() + offset, write_size)); @@ -164,18 +169,19 @@ Status BlobStore::Put(const Slice& value, Blob* blob) { Status BlobStore::Get(const Blob& blob, string* value) const { - ReadLock l(&buckets_mutex_); - - // assert that it doesn't overlap with free list - // it will get compiled out for release - assert(!free_list_.Overlap(blob)); + { + // assert that it doesn't overlap with free list + // it will get compiled out for release + MutexLock l(&free_list_mutex_); + assert(!free_list_.Overlap(blob)); + } value->resize(blob.Size() * block_size_); uint64_t offset = 0; // in bytes, not blocks for (auto chunk : blob.chunks) { Slice result; - assert(chunk.bucket_id < buckets_.size()); + assert(chunk.bucket_id < buckets_size_); Status s; s = buckets_[chunk.bucket_id].get()->Read(chunk.offset * block_size_, chunk.size * block_size_, @@ -200,8 +206,7 @@ Status BlobStore::Delete(const Blob& blob) { } Status BlobStore::Sync() { - ReadLock l(&buckets_mutex_); - for (size_t i = 0; i < buckets_.size(); ++i) { + for (size_t i = 0; i < buckets_size_; ++i) { Status s = buckets_[i].get()->Sync(); if (!s.ok()) { return s; @@ -228,10 +233,13 @@ Status BlobStore::Allocate(uint32_t blocks, Blob* blob) { // called with free_list_mutex_ held Status BlobStore::CreateNewBucket() { - WriteLock l(&buckets_mutex_); - int new_bucket_id; - new_bucket_id = buckets_.size(); - buckets_.push_back(unique_ptr()); + MutexLock l(&buckets_mutex_); + + if (buckets_size_ >= max_buckets_) { + return Status::IOError("Max size exceeded\n"); + } + + int new_bucket_id = buckets_size_; char fname[200]; sprintf(fname, "%s/%d.bs", directory_.c_str(), new_bucket_id); @@ -240,7 +248,6 @@ Status BlobStore::CreateNewBucket() { &buckets_[new_bucket_id], storage_options_); if (!s.ok()) { - buckets_.erase(buckets_.begin() + new_bucket_id); return s; } @@ -249,6 +256,8 @@ Status BlobStore::CreateNewBucket() { // (also, tmpfs does not support allocate) buckets_[new_bucket_id].get()->Allocate(0, block_size_ * blocks_per_bucket_); + buckets_size_ = new_bucket_id + 1; + return free_list_.Free(Blob(new_bucket_id, 0, blocks_per_bucket_)); } diff --git a/util/blob_store.h b/util/blob_store.h index be9947216..0a81d01df 100644 --- a/util/blob_store.h +++ b/util/blob_store.h @@ -102,10 +102,14 @@ class BlobStore { // Bucket is a device or a file that we use to store the blobs. // If we don't have enough blocks to allocate a new blob, we will // try to create a new file or device. + // max_buckets - maximum number of buckets BlobStore will create + // BlobStore max size in bytes is + // max_buckets * blocks_per_bucket * block_size // env - env for creating new files BlobStore(const std::string& directory, uint64_t block_size, uint32_t blocks_per_bucket, + uint32_t max_buckets, Env* env); ~BlobStore(); @@ -134,10 +138,15 @@ class BlobStore { EnvOptions storage_options_; // protected by free_list_mutex_ FreeList free_list_; + // free_list_mutex_ is locked BEFORE buckets_mutex_ mutable port::Mutex free_list_mutex_; - // protected by buckets mutex - std::vector> buckets_; - mutable port::RWMutex buckets_mutex_; + // protected by buckets_mutex_ + // array of buckets + unique_ptr* buckets_; + // number of buckets in the array + uint32_t buckets_size_; + uint32_t max_buckets_; + mutable port::Mutex buckets_mutex_; // Calls FreeList allocate. If free list can't allocate // new blob, creates new bucket and tries again diff --git a/util/blob_store_test.cc b/util/blob_store_test.cc index 0983faece..bc4900795 100644 --- a/util/blob_store_test.cc +++ b/util/blob_store_test.cc @@ -37,6 +37,7 @@ TEST(BlobStoreTest, SanityTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 1000, Env::Default()); string buf; @@ -98,6 +99,7 @@ TEST(BlobStoreTest, FragmentedChunksTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 1000, Env::Default()); string buf; @@ -138,6 +140,7 @@ TEST(BlobStoreTest, CreateAndStoreTest) { BlobStore blob_store(test::TmpDir() + "/blob_store_test", block_size, blocks_per_file, + 10000, Env::Default()); vector> ranges; @@ -165,6 +168,31 @@ TEST(BlobStoreTest, CreateAndStoreTest) { ASSERT_OK(blob_store.Sync()); } +TEST(BlobStoreTest, MaxSizeTest) { + const uint64_t block_size = 10; + const uint32_t blocks_per_file = 100; + const int max_buckets = 10; + Random random(5); + + BlobStore blob_store(test::TmpDir() + "/blob_store_test", + block_size, + blocks_per_file, + max_buckets, + Env::Default()); + string buf; + for (int i = 0; i < max_buckets; ++i) { + test::RandomString(&random, 1000, &buf); + Blob r; + ASSERT_OK(blob_store.Put(Slice(buf), &r)); + } + + test::RandomString(&random, 1000, &buf); + Blob r; + // should fail because max size + Status s = blob_store.Put(Slice(buf), &r); + ASSERT_EQ(s.ok(), false); +} + } // namespace rocksdb int main(int argc, char** argv) {