diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc index f94ae8a95..17bdfc882 100644 --- a/table/block_based/block_based_table_builder.cc +++ b/table/block_based/block_based_table_builder.cc @@ -254,13 +254,10 @@ struct BlockBasedTableBuilder::Rep { std::atomic offset; size_t alignment; BlockBuilder data_block; - // Buffers uncompressed data blocks and keys to replay later. Needed when + // Buffers uncompressed data blocks to replay later. Needed when // compression dictionary is enabled so we can finalize the dictionary before // compressing any data blocks. - // TODO(ajkr): ideally we don't buffer all keys and all uncompressed data - // blocks as it's redundant, but it's easier to implement for now. - std::vector>> - data_block_and_keys_buffers; + std::vector data_block_buffers; BlockBuilder range_del_block; InternalKeySliceTransform internal_prefix_transform; @@ -311,8 +308,7 @@ struct BlockBasedTableBuilder::Rep { }; State state; // `kBuffered` state is allowed only as long as the buffering of uncompressed - // data blocks (see `data_block_and_keys_buffers`) does not exceed - // `buffer_limit`. + // data blocks (see `data_block_buffers`) does not exceed `buffer_limit`. uint64_t buffer_limit; const bool use_delta_encoding_for_index_values; @@ -953,12 +949,8 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { r->last_key.assign(key.data(), key.size()); r->data_block.Add(key, value); if (r->state == Rep::State::kBuffered) { - // Buffer keys to be replayed during `Finish()` once compression - // dictionary has been finalized. - if (r->data_block_and_keys_buffers.empty() || should_flush) { - r->data_block_and_keys_buffers.emplace_back(); - } - r->data_block_and_keys_buffers.back().second.emplace_back(key.ToString()); + // Buffered keys will be replayed from data_block_buffers during + // `Finish()` once compression dictionary has been finalized. } else { if (!r->IsParallelCompressionEnabled()) { r->index_builder->OnKeyAdded(key); @@ -1019,11 +1011,8 @@ void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, block->SwapAndReset(raw_block_contents); if (rep_->state == Rep::State::kBuffered) { assert(is_data_block); - assert(!rep_->data_block_and_keys_buffers.empty()); - rep_->data_block_and_keys_buffers.back().first = - std::move(raw_block_contents); - rep_->data_begin_offset += - rep_->data_block_and_keys_buffers.back().first.size(); + rep_->data_block_buffers.emplace_back(std::move(raw_block_contents)); + rep_->data_begin_offset += rep_->data_block_buffers.back().size(); return; } WriteBlock(raw_block_contents, handle, is_data_block); @@ -1695,7 +1684,7 @@ void BlockBasedTableBuilder::EnterUnbuffered() { const size_t kSampleBytes = r->compression_opts.zstd_max_train_bytes > 0 ? r->compression_opts.zstd_max_train_bytes : r->compression_opts.max_dict_bytes; - const size_t kNumBlocksBuffered = r->data_block_and_keys_buffers.size(); + const size_t kNumBlocksBuffered = r->data_block_buffers.size(); if (kNumBlocksBuffered == 0) { // The below code is neither safe nor necessary for handling zero data // blocks. @@ -1725,11 +1714,10 @@ void BlockBasedTableBuilder::EnterUnbuffered() { for (size_t i = 0; i < kNumBlocksBuffered && compression_dict_samples.size() < kSampleBytes; ++i) { - size_t copy_len = - std::min(kSampleBytes - compression_dict_samples.size(), - r->data_block_and_keys_buffers[buffer_idx].first.size()); - compression_dict_samples.append( - r->data_block_and_keys_buffers[buffer_idx].first, 0, copy_len); + size_t copy_len = std::min(kSampleBytes - compression_dict_samples.size(), + r->data_block_buffers[buffer_idx].size()); + compression_dict_samples.append(r->data_block_buffers[buffer_idx], 0, + copy_len); compression_dict_sample_lens.emplace_back(copy_len); buffer_idx += kPrimeGeneratorRemainder; @@ -1754,30 +1742,58 @@ void BlockBasedTableBuilder::EnterUnbuffered() { dict, r->compression_type == kZSTD || r->compression_type == kZSTDNotFinalCompression)); - for (size_t i = 0; ok() && i < r->data_block_and_keys_buffers.size(); ++i) { - auto& data_block = r->data_block_and_keys_buffers[i].first; - auto& keys = r->data_block_and_keys_buffers[i].second; + auto get_iterator_for_block = [&r](size_t i) { + auto& data_block = r->data_block_buffers[i]; assert(!data_block.empty()); - assert(!keys.empty()); + + Block reader{BlockContents{data_block}}; + DataBlockIter* iter = reader.NewDataIterator( + r->internal_comparator.user_comparator(), kDisableGlobalSequenceNumber); + + iter->SeekToFirst(); + assert(iter->Valid()); + return std::unique_ptr(iter); + }; + + std::unique_ptr iter = nullptr, next_block_iter = nullptr; + + for (size_t i = 0; ok() && i < r->data_block_buffers.size(); ++i) { + if (iter == nullptr) { + iter = get_iterator_for_block(i); + assert(iter != nullptr); + }; + + if (i + 1 < r->data_block_buffers.size()) { + next_block_iter = get_iterator_for_block(i + 1); + } + + auto& data_block = r->data_block_buffers[i]; if (r->IsParallelCompressionEnabled()) { Slice first_key_in_next_block; const Slice* first_key_in_next_block_ptr = &first_key_in_next_block; - if (i + 1 < r->data_block_and_keys_buffers.size()) { - first_key_in_next_block = - r->data_block_and_keys_buffers[i + 1].second.front(); + if (i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + first_key_in_next_block = next_block_iter->key(); } else { first_key_in_next_block_ptr = r->first_key_in_next_block; } + std::vector keys; + for (; iter->Valid(); iter->Next()) { + keys.emplace_back(iter->key().ToString()); + } + ParallelCompressionRep::BlockRep* block_rep = r->pc_rep->PrepareBlock( r->compression_type, first_key_in_next_block_ptr, &data_block, &keys); + assert(block_rep != nullptr); r->pc_rep->file_size_estimator.EmitBlock(block_rep->data->size(), r->get_offset()); r->pc_rep->EmitBlock(block_rep); } else { - for (const auto& key : keys) { + for (; iter->Valid(); iter->Next()) { + Slice key = iter->key(); if (r->filter_builder != nullptr) { size_t ts_sz = r->internal_comparator.user_comparator()->timestamp_size(); @@ -1787,16 +1803,22 @@ void BlockBasedTableBuilder::EnterUnbuffered() { } WriteBlock(Slice(data_block), &r->pending_handle, true /* is_data_block */); - if (ok() && i + 1 < r->data_block_and_keys_buffers.size()) { - Slice first_key_in_next_block = - r->data_block_and_keys_buffers[i + 1].second.front(); + if (ok() && i + 1 < r->data_block_buffers.size()) { + assert(next_block_iter != nullptr); + Slice first_key_in_next_block = next_block_iter->key(); + Slice* first_key_in_next_block_ptr = &first_key_in_next_block; - r->index_builder->AddIndexEntry( - &keys.back(), first_key_in_next_block_ptr, r->pending_handle); + + iter->SeekToLast(); + std::string last_key = iter->key().ToString(); + r->index_builder->AddIndexEntry(&last_key, first_key_in_next_block_ptr, + r->pending_handle); } } + + std::swap(iter, next_block_iter); } - r->data_block_and_keys_buffers.clear(); + r->data_block_buffers.clear(); } Status BlockBasedTableBuilder::Finish() {