diff options
Diffstat (limited to 'icing/index/numeric/integer-index-storage.cc')
-rw-r--r-- | icing/index/numeric/integer-index-storage.cc | 254 |
1 files changed, 176 insertions, 78 deletions
diff --git a/icing/index/numeric/integer-index-storage.cc b/icing/index/numeric/integer-index-storage.cc index fa62b19..72e0266 100644 --- a/icing/index/numeric/integer-index-storage.cc +++ b/icing/index/numeric/integer-index-storage.cc @@ -45,6 +45,7 @@ #include "icing/index/numeric/posting-list-integer-index-serializer.h" #include "icing/schema/section.h" #include "icing/store/document-id.h" +#include "icing/util/crc32.h" #include "icing/util/status-macros.h" namespace icing { @@ -151,18 +152,25 @@ class BucketPostingListIterator { : pl_accessor_(std::move(pl_accessor)), should_retrieve_next_batch_(true) {} + struct AdvanceAndFilterResult { + libtextclassifier3::Status status = libtextclassifier3::Status::OK; + int32_t num_advance_calls = 0; + int32_t num_blocks_inspected = 0; + }; // Advances to the next relevant data. The posting list of a bucket contains // keys within range [bucket.key_lower, bucket.key_upper], but some of them // may be out of [query_key_lower, query_key_upper], so when advancing we have // to filter out those non-relevant keys. // // Returns: + // AdvanceAndFilterResult. status will be: // - OK on success // - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant // data) // - Any other PostingListIntegerIndexAccessor errors - libtextclassifier3::Status AdvanceAndFilter(int64_t query_key_lower, - int64_t query_key_upper) { + AdvanceAndFilterResult AdvanceAndFilter(int64_t query_key_lower, + int64_t query_key_upper) { + AdvanceAndFilterResult result; // Move curr_ until reaching a relevant data (i.e. key in range // [query_key_lower, query_key_upper]) do { @@ -172,12 +180,18 @@ class BucketPostingListIterator { curr_ >= cached_batch_integer_index_data_.cend(); } if (should_retrieve_next_batch_) { - ICING_RETURN_IF_ERROR(GetNextDataBatch()); + auto status = GetNextDataBatch(); + if (!status.ok()) { + result.status = std::move(status); + return result; + } + ++result.num_blocks_inspected; should_retrieve_next_batch_ = false; } + ++result.num_advance_calls; } while (curr_->key() < query_key_lower || curr_->key() > query_key_upper); - return libtextclassifier3::Status::OK; + return result; } const BasicHit& GetCurrentBasicHit() const { return curr_->basic_hit(); } @@ -222,7 +236,9 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator { explicit IntegerIndexStorageIterator( int64_t query_key_lower, int64_t query_key_upper, std::vector<std::unique_ptr<BucketPostingListIterator>>&& bucket_pl_iters) - : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper) { + : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper), + num_advance_calls_(0), + num_blocks_inspected_(0) { std::vector<BucketPostingListIterator*> bucket_pl_iters_raw_ptrs; for (std::unique_ptr<BucketPostingListIterator>& bucket_pl_itr : bucket_pl_iters) { @@ -232,11 +248,15 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator { // Note: it is possible that the bucket iterator fails to advance for the // first round, because data could be filtered out by [query_key_lower, // query_key_upper]. In this case, just discard the iterator. - if (bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper) - .ok()) { + BucketPostingListIterator::AdvanceAndFilterResult + advance_and_filter_result = + bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper); + if (advance_and_filter_result.status.ok()) { bucket_pl_iters_raw_ptrs.push_back(bucket_pl_itr.get()); bucket_pl_iters_.push_back(std::move(bucket_pl_itr)); } + num_advance_calls_ += advance_and_filter_result.num_advance_calls; + num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected; } pq_ = std::priority_queue<BucketPostingListIterator*, @@ -259,6 +279,12 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator { DocHitInfo GetDocHitInfo() const override { return doc_hit_info_; } + int32_t GetNumAdvanceCalls() const override { return num_advance_calls_; } + + int32_t GetNumBlocksInspected() const override { + return num_blocks_inspected_; + } + private: BucketPostingListIterator::Comparator comparator_; @@ -280,6 +306,9 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator { pq_; DocHitInfo doc_hit_info_; + + int32_t num_advance_calls_; + int32_t num_blocks_inspected_; }; libtextclassifier3::Status IntegerIndexStorageIterator::Advance() { @@ -299,7 +328,12 @@ libtextclassifier3::Status IntegerIndexStorageIterator::Advance() { do { doc_hit_info_.UpdateSection( bucket_itr->GetCurrentBasicHit().section_id()); - advance_status = bucket_itr->AdvanceAndFilter(key_lower_, key_upper_); + BucketPostingListIterator::AdvanceAndFilterResult + advance_and_filter_result = + bucket_itr->AdvanceAndFilter(key_lower_, key_upper_); + advance_status = std::move(advance_and_filter_result.status); + num_advance_calls_ += advance_and_filter_result.num_advance_calls; + num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected; } while (advance_status.ok() && bucket_itr->GetCurrentBasicHit().document_id() == document_id); if (advance_status.ok()) { @@ -311,6 +345,11 @@ libtextclassifier3::Status IntegerIndexStorageIterator::Advance() { } bool IntegerIndexStorage::Options::IsValid() const { + if (num_data_threshold_for_bucket_split <= + kMinNumDataThresholdForBucketSplit) { + return false; + } + if (!HasCustomInitBuckets()) { return true; } @@ -403,12 +442,20 @@ libtextclassifier3::Status IntegerIndexStorage::AddKeys( return libtextclassifier3::Status::OK; } + SetDirty(); + std::sort(new_keys.begin(), new_keys.end()); // Dedupe auto last = std::unique(new_keys.begin(), new_keys.end()); new_keys.erase(last, new_keys.end()); + if (static_cast<int32_t>(new_keys.size()) > + std::numeric_limits<int32_t>::max() - info().num_data) { + return absl_ports::ResourceExhaustedError( + "# of keys in this integer index storage exceed the limit"); + } + // When adding keys into a bucket, we potentially split it into 2 new buckets // and one of them will be added into the unsorted bucket array. // When handling keys belonging to buckets in the unsorted bucket array, we @@ -649,6 +696,9 @@ libtextclassifier3::Status IntegerIndexStorage::TransferIndex( return lhs.get() < rhs.get(); }); + const int32_t num_data_threshold_for_bucket_merge = + kNumDataThresholdRatioForBucketMerge * + new_storage->options_.num_data_threshold_for_bucket_split; int64_t curr_key_lower = std::numeric_limits<int64_t>::min(); int64_t curr_key_upper = std::numeric_limits<int64_t>::min(); std::vector<IntegerIndexData> accumulated_data; @@ -687,7 +737,7 @@ libtextclassifier3::Status IntegerIndexStorage::TransferIndex( // - Flush accumulated_data and create a new bucket for them. // - OR merge new_data into accumulated_data and go to the next round. if (!accumulated_data.empty() && accumulated_data.size() + new_data.size() > - kNumDataThresholdForBucketMerge) { + num_data_threshold_for_bucket_merge) { // TODO(b/259743562): [Optimization 3] adjust upper bound to fit more data // from new_data to accumulated_data. ICING_RETURN_IF_ERROR(FlushDataIntoNewSortedBucket( @@ -879,9 +929,11 @@ IntegerIndexStorage::InitializeExistingFiles( IntegerIndexStorage::FlushDataIntoNewSortedBucket( int64_t key_lower, int64_t key_upper, std::vector<IntegerIndexData>&& data, IntegerIndexStorage* storage) { + storage->SetDirty(); + if (data.empty()) { - return storage->sorted_buckets_->Append( - Bucket(key_lower, key_upper, PostingListIdentifier::kInvalid)); + return storage->sorted_buckets_->Append(Bucket( + key_lower, key_upper, PostingListIdentifier::kInvalid, /*num_data=*/0)); } ICING_ASSIGN_OR_RETURN( @@ -891,10 +943,16 @@ IntegerIndexStorage::FlushDataIntoNewSortedBucket( data.end())); storage->info().num_data += data.size(); - return storage->sorted_buckets_->Append(Bucket(key_lower, key_upper, pl_id)); + return storage->sorted_buckets_->Append( + Bucket(key_lower, key_upper, pl_id, data.size())); } -libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() { +libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk( + bool force) { + if (!force && !is_storage_dirty()) { + return libtextclassifier3::Status::OK; + } + ICING_RETURN_IF_ERROR(sorted_buckets_->PersistToDisk()); ICING_RETURN_IF_ERROR(unsorted_buckets_->PersistToDisk()); if (!flash_index_storage_->PersistToDisk()) { @@ -904,19 +962,35 @@ libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() { return libtextclassifier3::Status::OK; } -libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk() { +libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk( + bool force) { + // We can skip persisting metadata to disk only if both info and storage are + // clean. + if (!force && !is_info_dirty() && !is_storage_dirty()) { + return libtextclassifier3::Status::OK; + } + // Changes should have been applied to the underlying file when using // MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC, but call msync() as an // extra safety step to ensure they are written out. return metadata_mmapped_file_->PersistToDisk(); } -libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::ComputeInfoChecksum() { +libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::ComputeInfoChecksum( + bool force) { + if (!force && !is_info_dirty()) { + return Crc32(crcs().component_crcs.info_crc); + } + return info().ComputeChecksum(); } libtextclassifier3::StatusOr<Crc32> -IntegerIndexStorage::ComputeStoragesChecksum() { +IntegerIndexStorage::ComputeStoragesChecksum(bool force) { + if (!force && !is_storage_dirty()) { + return Crc32(crcs().component_crcs.storages_crc); + } + // Compute crcs ICING_ASSIGN_OR_RETURN(Crc32 sorted_buckets_crc, sorted_buckets_->ComputeChecksum()); @@ -933,6 +1007,89 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary( const std::vector<int64_t>::const_iterator& it_start, const std::vector<int64_t>::const_iterator& it_end, FileBackedVector<Bucket>::MutableView& mutable_bucket) { + int32_t num_data_in_bucket = mutable_bucket.Get().num_data(); + int32_t num_new_data = std::distance(it_start, it_end); + if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() && + num_new_data + num_data_in_bucket > + options_.num_data_threshold_for_bucket_split) { + // Split bucket. + + // 1. Read all data and free all posting lists. + std::vector<IntegerIndexData> all_data; + if (mutable_bucket.Get().posting_list_identifier().is_valid()) { + ICING_ASSIGN_OR_RETURN( + std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor, + PostingListIntegerIndexAccessor::CreateFromExisting( + flash_index_storage_.get(), posting_list_serializer_, + mutable_bucket.Get().posting_list_identifier())); + ICING_ASSIGN_OR_RETURN(all_data, pl_accessor->GetAllDataAndFree()); + } + + // 2. Append all new data. + all_data.reserve(all_data.size() + num_new_data); + for (auto it = it_start; it != it_end; ++it) { + all_data.push_back(IntegerIndexData(section_id, document_id, *it)); + } + + // 3. Run bucket splitting algorithm to decide new buckets and dispatch + // data. + // - # of data in a full bucket = + // options_.num_data_threshold_for_bucket_split. + // - Bucket splitting logic will be invoked if adding new data + // (num_new_data >= 1) into a full bucket. + // - In order to achieve good (amortized) time complexity, we want # of + // data in new buckets to be around half_of_threshold (i.e. + // options_.num_data_threshold_for_bucket_split / 2). + // - Using half_of_threshold as the cutoff threshold will cause splitting + // buckets with [half_of_threshold, half_of_threshold, num_new_data] + // data, which is not ideal because num_new_data is usually small. + // - Thus, we pick (half_of_threshold + kNumDataAfterSplitAdjustment) as + // the cutoff threshold to avoid over-splitting. It can tolerate + // num_new_data up to (2 * kNumDataAfterSplitAdjustment) and + // split only 2 buckets (instead of 3) with + // [half_of_threshold + kNumDataAfterSplitAdjustment, + // half_of_threshold + (kNumDataAfterSplitAdjustment - num_new_data)]. + int32_t cutoff_threshold = + options_.num_data_threshold_for_bucket_split / 2 + + kNumDataAfterSplitAdjustment; + std::vector<integer_index_bucket_util::DataRangeAndBucketInfo> + new_bucket_infos = integer_index_bucket_util::Split( + all_data, mutable_bucket.Get().key_lower(), + mutable_bucket.Get().key_upper(), cutoff_threshold); + if (new_bucket_infos.empty()) { + ICING_LOG(WARNING) + << "No buckets after splitting. This should not happen."; + return absl_ports::InternalError("Split error"); + } + + // 4. Flush data and create new buckets. + std::vector<Bucket> new_buckets; + for (int i = 0; i < new_bucket_infos.size(); ++i) { + int32_t num_data_in_new_bucket = + std::distance(new_bucket_infos[i].start, new_bucket_infos[i].end); + ICING_ASSIGN_OR_RETURN( + PostingListIdentifier pl_id, + FlushDataIntoPostingLists( + flash_index_storage_.get(), posting_list_serializer_, + new_bucket_infos[i].start, new_bucket_infos[i].end)); + if (i == 0) { + // Reuse mutable_bucket + mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower); + mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper); + mutable_bucket.Get().set_posting_list_identifier(pl_id); + mutable_bucket.Get().set_num_data(num_data_in_new_bucket); + } else { + new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower, + new_bucket_infos[i].key_upper, pl_id, + num_data_in_new_bucket)); + } + } + + return new_buckets; + } + + // Otherwise, we don't need to split bucket. Just simply add all new data into + // the bucket. std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor; if (mutable_bucket.Get().posting_list_identifier().is_valid()) { ICING_ASSIGN_OR_RETURN( @@ -946,68 +1103,6 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary( } for (auto it = it_start; it != it_end; ++it) { - if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() && - pl_accessor->WantsSplit()) { - // If the bucket needs split (max size and full) and is splittable, then - // we perform bucket splitting. - - // 1. Finalize the current posting list accessor. - PostingListAccessor::FinalizeResult result = - std::move(*pl_accessor).Finalize(); - if (!result.status.ok()) { - return result.status; - } - - // 2. Create another posting list accessor instance. Read all data and - // free all posting lists. - ICING_ASSIGN_OR_RETURN( - pl_accessor, - PostingListIntegerIndexAccessor::CreateFromExisting( - flash_index_storage_.get(), posting_list_serializer_, result.id)); - ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> all_data, - pl_accessor->GetAllDataAndFree()); - - // 3. Append all remaining new data. - all_data.reserve(all_data.size() + std::distance(it, it_end)); - for (; it != it_end; ++it) { - all_data.push_back(IntegerIndexData(section_id, document_id, *it)); - } - - // 4. Run bucket splitting algorithm to decide new buckets and dispatch - // data. - std::vector<integer_index_bucket_util::DataRangeAndBucketInfo> - new_bucket_infos = integer_index_bucket_util::Split( - all_data, mutable_bucket.Get().key_lower(), - mutable_bucket.Get().key_upper(), - kNumDataThresholdForBucketSplit); - if (new_bucket_infos.empty()) { - ICING_LOG(WARNING) - << "No buckets after splitting. This should not happen."; - return absl_ports::InternalError("Split error"); - } - - // 5. Flush data. - std::vector<Bucket> new_buckets; - for (int i = 0; i < new_bucket_infos.size(); ++i) { - ICING_ASSIGN_OR_RETURN( - PostingListIdentifier pl_id, - FlushDataIntoPostingLists( - flash_index_storage_.get(), posting_list_serializer_, - new_bucket_infos[i].start, new_bucket_infos[i].end)); - if (i == 0) { - // Reuse mutable_bucket - mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower); - mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper); - mutable_bucket.Get().set_posting_list_identifier(pl_id); - } else { - new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower, - new_bucket_infos[i].key_upper, pl_id)); - } - } - - return new_buckets; - } - ICING_RETURN_IF_ERROR(pl_accessor->PrependData( IntegerIndexData(section_id, document_id, *it))); } @@ -1022,6 +1117,9 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary( } mutable_bucket.Get().set_posting_list_identifier(result.id); + // We've already verified num_new_data won't exceed the limit of the entire + // storage, so it is safe to add to the counter of the bucket. + mutable_bucket.Get().set_num_data(num_data_in_bucket + num_new_data); return std::vector<Bucket>(); } |