aboutsummaryrefslogtreecommitdiff
path: root/icing/index/numeric/integer-index-storage.cc
diff options
context:
space:
mode:
Diffstat (limited to 'icing/index/numeric/integer-index-storage.cc')
-rw-r--r--icing/index/numeric/integer-index-storage.cc254
1 files changed, 176 insertions, 78 deletions
diff --git a/icing/index/numeric/integer-index-storage.cc b/icing/index/numeric/integer-index-storage.cc
index fa62b19..72e0266 100644
--- a/icing/index/numeric/integer-index-storage.cc
+++ b/icing/index/numeric/integer-index-storage.cc
@@ -45,6 +45,7 @@
#include "icing/index/numeric/posting-list-integer-index-serializer.h"
#include "icing/schema/section.h"
#include "icing/store/document-id.h"
+#include "icing/util/crc32.h"
#include "icing/util/status-macros.h"
namespace icing {
@@ -151,18 +152,25 @@ class BucketPostingListIterator {
: pl_accessor_(std::move(pl_accessor)),
should_retrieve_next_batch_(true) {}
+ struct AdvanceAndFilterResult {
+ libtextclassifier3::Status status = libtextclassifier3::Status::OK;
+ int32_t num_advance_calls = 0;
+ int32_t num_blocks_inspected = 0;
+ };
// Advances to the next relevant data. The posting list of a bucket contains
// keys within range [bucket.key_lower, bucket.key_upper], but some of them
// may be out of [query_key_lower, query_key_upper], so when advancing we have
// to filter out those non-relevant keys.
//
// Returns:
+ // AdvanceAndFilterResult. status will be:
// - OK on success
// - RESOURCE_EXHAUSTED_ERROR if reaching the end (i.e. no more relevant
// data)
// - Any other PostingListIntegerIndexAccessor errors
- libtextclassifier3::Status AdvanceAndFilter(int64_t query_key_lower,
- int64_t query_key_upper) {
+ AdvanceAndFilterResult AdvanceAndFilter(int64_t query_key_lower,
+ int64_t query_key_upper) {
+ AdvanceAndFilterResult result;
// Move curr_ until reaching a relevant data (i.e. key in range
// [query_key_lower, query_key_upper])
do {
@@ -172,12 +180,18 @@ class BucketPostingListIterator {
curr_ >= cached_batch_integer_index_data_.cend();
}
if (should_retrieve_next_batch_) {
- ICING_RETURN_IF_ERROR(GetNextDataBatch());
+ auto status = GetNextDataBatch();
+ if (!status.ok()) {
+ result.status = std::move(status);
+ return result;
+ }
+ ++result.num_blocks_inspected;
should_retrieve_next_batch_ = false;
}
+ ++result.num_advance_calls;
} while (curr_->key() < query_key_lower || curr_->key() > query_key_upper);
- return libtextclassifier3::Status::OK;
+ return result;
}
const BasicHit& GetCurrentBasicHit() const { return curr_->basic_hit(); }
@@ -222,7 +236,9 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
explicit IntegerIndexStorageIterator(
int64_t query_key_lower, int64_t query_key_upper,
std::vector<std::unique_ptr<BucketPostingListIterator>>&& bucket_pl_iters)
- : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper) {
+ : NumericIndex<int64_t>::Iterator(query_key_lower, query_key_upper),
+ num_advance_calls_(0),
+ num_blocks_inspected_(0) {
std::vector<BucketPostingListIterator*> bucket_pl_iters_raw_ptrs;
for (std::unique_ptr<BucketPostingListIterator>& bucket_pl_itr :
bucket_pl_iters) {
@@ -232,11 +248,15 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
// Note: it is possible that the bucket iterator fails to advance for the
// first round, because data could be filtered out by [query_key_lower,
// query_key_upper]. In this case, just discard the iterator.
- if (bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper)
- .ok()) {
+ BucketPostingListIterator::AdvanceAndFilterResult
+ advance_and_filter_result =
+ bucket_pl_itr->AdvanceAndFilter(query_key_lower, query_key_upper);
+ if (advance_and_filter_result.status.ok()) {
bucket_pl_iters_raw_ptrs.push_back(bucket_pl_itr.get());
bucket_pl_iters_.push_back(std::move(bucket_pl_itr));
}
+ num_advance_calls_ += advance_and_filter_result.num_advance_calls;
+ num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
}
pq_ = std::priority_queue<BucketPostingListIterator*,
@@ -259,6 +279,12 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
DocHitInfo GetDocHitInfo() const override { return doc_hit_info_; }
+ int32_t GetNumAdvanceCalls() const override { return num_advance_calls_; }
+
+ int32_t GetNumBlocksInspected() const override {
+ return num_blocks_inspected_;
+ }
+
private:
BucketPostingListIterator::Comparator comparator_;
@@ -280,6 +306,9 @@ class IntegerIndexStorageIterator : public NumericIndex<int64_t>::Iterator {
pq_;
DocHitInfo doc_hit_info_;
+
+ int32_t num_advance_calls_;
+ int32_t num_blocks_inspected_;
};
libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
@@ -299,7 +328,12 @@ libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
do {
doc_hit_info_.UpdateSection(
bucket_itr->GetCurrentBasicHit().section_id());
- advance_status = bucket_itr->AdvanceAndFilter(key_lower_, key_upper_);
+ BucketPostingListIterator::AdvanceAndFilterResult
+ advance_and_filter_result =
+ bucket_itr->AdvanceAndFilter(key_lower_, key_upper_);
+ advance_status = std::move(advance_and_filter_result.status);
+ num_advance_calls_ += advance_and_filter_result.num_advance_calls;
+ num_blocks_inspected_ += advance_and_filter_result.num_blocks_inspected;
} while (advance_status.ok() &&
bucket_itr->GetCurrentBasicHit().document_id() == document_id);
if (advance_status.ok()) {
@@ -311,6 +345,11 @@ libtextclassifier3::Status IntegerIndexStorageIterator::Advance() {
}
bool IntegerIndexStorage::Options::IsValid() const {
+ if (num_data_threshold_for_bucket_split <=
+ kMinNumDataThresholdForBucketSplit) {
+ return false;
+ }
+
if (!HasCustomInitBuckets()) {
return true;
}
@@ -403,12 +442,20 @@ libtextclassifier3::Status IntegerIndexStorage::AddKeys(
return libtextclassifier3::Status::OK;
}
+ SetDirty();
+
std::sort(new_keys.begin(), new_keys.end());
// Dedupe
auto last = std::unique(new_keys.begin(), new_keys.end());
new_keys.erase(last, new_keys.end());
+ if (static_cast<int32_t>(new_keys.size()) >
+ std::numeric_limits<int32_t>::max() - info().num_data) {
+ return absl_ports::ResourceExhaustedError(
+ "# of keys in this integer index storage exceed the limit");
+ }
+
// When adding keys into a bucket, we potentially split it into 2 new buckets
// and one of them will be added into the unsorted bucket array.
// When handling keys belonging to buckets in the unsorted bucket array, we
@@ -649,6 +696,9 @@ libtextclassifier3::Status IntegerIndexStorage::TransferIndex(
return lhs.get() < rhs.get();
});
+ const int32_t num_data_threshold_for_bucket_merge =
+ kNumDataThresholdRatioForBucketMerge *
+ new_storage->options_.num_data_threshold_for_bucket_split;
int64_t curr_key_lower = std::numeric_limits<int64_t>::min();
int64_t curr_key_upper = std::numeric_limits<int64_t>::min();
std::vector<IntegerIndexData> accumulated_data;
@@ -687,7 +737,7 @@ libtextclassifier3::Status IntegerIndexStorage::TransferIndex(
// - Flush accumulated_data and create a new bucket for them.
// - OR merge new_data into accumulated_data and go to the next round.
if (!accumulated_data.empty() && accumulated_data.size() + new_data.size() >
- kNumDataThresholdForBucketMerge) {
+ num_data_threshold_for_bucket_merge) {
// TODO(b/259743562): [Optimization 3] adjust upper bound to fit more data
// from new_data to accumulated_data.
ICING_RETURN_IF_ERROR(FlushDataIntoNewSortedBucket(
@@ -879,9 +929,11 @@ IntegerIndexStorage::InitializeExistingFiles(
IntegerIndexStorage::FlushDataIntoNewSortedBucket(
int64_t key_lower, int64_t key_upper, std::vector<IntegerIndexData>&& data,
IntegerIndexStorage* storage) {
+ storage->SetDirty();
+
if (data.empty()) {
- return storage->sorted_buckets_->Append(
- Bucket(key_lower, key_upper, PostingListIdentifier::kInvalid));
+ return storage->sorted_buckets_->Append(Bucket(
+ key_lower, key_upper, PostingListIdentifier::kInvalid, /*num_data=*/0));
}
ICING_ASSIGN_OR_RETURN(
@@ -891,10 +943,16 @@ IntegerIndexStorage::FlushDataIntoNewSortedBucket(
data.end()));
storage->info().num_data += data.size();
- return storage->sorted_buckets_->Append(Bucket(key_lower, key_upper, pl_id));
+ return storage->sorted_buckets_->Append(
+ Bucket(key_lower, key_upper, pl_id, data.size()));
}
-libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() {
+libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk(
+ bool force) {
+ if (!force && !is_storage_dirty()) {
+ return libtextclassifier3::Status::OK;
+ }
+
ICING_RETURN_IF_ERROR(sorted_buckets_->PersistToDisk());
ICING_RETURN_IF_ERROR(unsorted_buckets_->PersistToDisk());
if (!flash_index_storage_->PersistToDisk()) {
@@ -904,19 +962,35 @@ libtextclassifier3::Status IntegerIndexStorage::PersistStoragesToDisk() {
return libtextclassifier3::Status::OK;
}
-libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk() {
+libtextclassifier3::Status IntegerIndexStorage::PersistMetadataToDisk(
+ bool force) {
+ // We can skip persisting metadata to disk only if both info and storage are
+ // clean.
+ if (!force && !is_info_dirty() && !is_storage_dirty()) {
+ return libtextclassifier3::Status::OK;
+ }
+
// Changes should have been applied to the underlying file when using
// MemoryMappedFile::Strategy::READ_WRITE_AUTO_SYNC, but call msync() as an
// extra safety step to ensure they are written out.
return metadata_mmapped_file_->PersistToDisk();
}
-libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::ComputeInfoChecksum() {
+libtextclassifier3::StatusOr<Crc32> IntegerIndexStorage::ComputeInfoChecksum(
+ bool force) {
+ if (!force && !is_info_dirty()) {
+ return Crc32(crcs().component_crcs.info_crc);
+ }
+
return info().ComputeChecksum();
}
libtextclassifier3::StatusOr<Crc32>
-IntegerIndexStorage::ComputeStoragesChecksum() {
+IntegerIndexStorage::ComputeStoragesChecksum(bool force) {
+ if (!force && !is_storage_dirty()) {
+ return Crc32(crcs().component_crcs.storages_crc);
+ }
+
// Compute crcs
ICING_ASSIGN_OR_RETURN(Crc32 sorted_buckets_crc,
sorted_buckets_->ComputeChecksum());
@@ -933,6 +1007,89 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
const std::vector<int64_t>::const_iterator& it_start,
const std::vector<int64_t>::const_iterator& it_end,
FileBackedVector<Bucket>::MutableView& mutable_bucket) {
+ int32_t num_data_in_bucket = mutable_bucket.Get().num_data();
+ int32_t num_new_data = std::distance(it_start, it_end);
+ if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() &&
+ num_new_data + num_data_in_bucket >
+ options_.num_data_threshold_for_bucket_split) {
+ // Split bucket.
+
+ // 1. Read all data and free all posting lists.
+ std::vector<IntegerIndexData> all_data;
+ if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
+ ICING_ASSIGN_OR_RETURN(
+ std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor,
+ PostingListIntegerIndexAccessor::CreateFromExisting(
+ flash_index_storage_.get(), posting_list_serializer_,
+ mutable_bucket.Get().posting_list_identifier()));
+ ICING_ASSIGN_OR_RETURN(all_data, pl_accessor->GetAllDataAndFree());
+ }
+
+ // 2. Append all new data.
+ all_data.reserve(all_data.size() + num_new_data);
+ for (auto it = it_start; it != it_end; ++it) {
+ all_data.push_back(IntegerIndexData(section_id, document_id, *it));
+ }
+
+ // 3. Run bucket splitting algorithm to decide new buckets and dispatch
+ // data.
+ // - # of data in a full bucket =
+ // options_.num_data_threshold_for_bucket_split.
+ // - Bucket splitting logic will be invoked if adding new data
+ // (num_new_data >= 1) into a full bucket.
+ // - In order to achieve good (amortized) time complexity, we want # of
+ // data in new buckets to be around half_of_threshold (i.e.
+ // options_.num_data_threshold_for_bucket_split / 2).
+ // - Using half_of_threshold as the cutoff threshold will cause splitting
+ // buckets with [half_of_threshold, half_of_threshold, num_new_data]
+ // data, which is not ideal because num_new_data is usually small.
+ // - Thus, we pick (half_of_threshold + kNumDataAfterSplitAdjustment) as
+ // the cutoff threshold to avoid over-splitting. It can tolerate
+ // num_new_data up to (2 * kNumDataAfterSplitAdjustment) and
+ // split only 2 buckets (instead of 3) with
+ // [half_of_threshold + kNumDataAfterSplitAdjustment,
+ // half_of_threshold + (kNumDataAfterSplitAdjustment - num_new_data)].
+ int32_t cutoff_threshold =
+ options_.num_data_threshold_for_bucket_split / 2 +
+ kNumDataAfterSplitAdjustment;
+ std::vector<integer_index_bucket_util::DataRangeAndBucketInfo>
+ new_bucket_infos = integer_index_bucket_util::Split(
+ all_data, mutable_bucket.Get().key_lower(),
+ mutable_bucket.Get().key_upper(), cutoff_threshold);
+ if (new_bucket_infos.empty()) {
+ ICING_LOG(WARNING)
+ << "No buckets after splitting. This should not happen.";
+ return absl_ports::InternalError("Split error");
+ }
+
+ // 4. Flush data and create new buckets.
+ std::vector<Bucket> new_buckets;
+ for (int i = 0; i < new_bucket_infos.size(); ++i) {
+ int32_t num_data_in_new_bucket =
+ std::distance(new_bucket_infos[i].start, new_bucket_infos[i].end);
+ ICING_ASSIGN_OR_RETURN(
+ PostingListIdentifier pl_id,
+ FlushDataIntoPostingLists(
+ flash_index_storage_.get(), posting_list_serializer_,
+ new_bucket_infos[i].start, new_bucket_infos[i].end));
+ if (i == 0) {
+ // Reuse mutable_bucket
+ mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower);
+ mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper);
+ mutable_bucket.Get().set_posting_list_identifier(pl_id);
+ mutable_bucket.Get().set_num_data(num_data_in_new_bucket);
+ } else {
+ new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower,
+ new_bucket_infos[i].key_upper, pl_id,
+ num_data_in_new_bucket));
+ }
+ }
+
+ return new_buckets;
+ }
+
+ // Otherwise, we don't need to split bucket. Just simply add all new data into
+ // the bucket.
std::unique_ptr<PostingListIntegerIndexAccessor> pl_accessor;
if (mutable_bucket.Get().posting_list_identifier().is_valid()) {
ICING_ASSIGN_OR_RETURN(
@@ -946,68 +1103,6 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
}
for (auto it = it_start; it != it_end; ++it) {
- if (mutable_bucket.Get().key_lower() < mutable_bucket.Get().key_upper() &&
- pl_accessor->WantsSplit()) {
- // If the bucket needs split (max size and full) and is splittable, then
- // we perform bucket splitting.
-
- // 1. Finalize the current posting list accessor.
- PostingListAccessor::FinalizeResult result =
- std::move(*pl_accessor).Finalize();
- if (!result.status.ok()) {
- return result.status;
- }
-
- // 2. Create another posting list accessor instance. Read all data and
- // free all posting lists.
- ICING_ASSIGN_OR_RETURN(
- pl_accessor,
- PostingListIntegerIndexAccessor::CreateFromExisting(
- flash_index_storage_.get(), posting_list_serializer_, result.id));
- ICING_ASSIGN_OR_RETURN(std::vector<IntegerIndexData> all_data,
- pl_accessor->GetAllDataAndFree());
-
- // 3. Append all remaining new data.
- all_data.reserve(all_data.size() + std::distance(it, it_end));
- for (; it != it_end; ++it) {
- all_data.push_back(IntegerIndexData(section_id, document_id, *it));
- }
-
- // 4. Run bucket splitting algorithm to decide new buckets and dispatch
- // data.
- std::vector<integer_index_bucket_util::DataRangeAndBucketInfo>
- new_bucket_infos = integer_index_bucket_util::Split(
- all_data, mutable_bucket.Get().key_lower(),
- mutable_bucket.Get().key_upper(),
- kNumDataThresholdForBucketSplit);
- if (new_bucket_infos.empty()) {
- ICING_LOG(WARNING)
- << "No buckets after splitting. This should not happen.";
- return absl_ports::InternalError("Split error");
- }
-
- // 5. Flush data.
- std::vector<Bucket> new_buckets;
- for (int i = 0; i < new_bucket_infos.size(); ++i) {
- ICING_ASSIGN_OR_RETURN(
- PostingListIdentifier pl_id,
- FlushDataIntoPostingLists(
- flash_index_storage_.get(), posting_list_serializer_,
- new_bucket_infos[i].start, new_bucket_infos[i].end));
- if (i == 0) {
- // Reuse mutable_bucket
- mutable_bucket.Get().set_key_lower(new_bucket_infos[i].key_lower);
- mutable_bucket.Get().set_key_upper(new_bucket_infos[i].key_upper);
- mutable_bucket.Get().set_posting_list_identifier(pl_id);
- } else {
- new_buckets.push_back(Bucket(new_bucket_infos[i].key_lower,
- new_bucket_infos[i].key_upper, pl_id));
- }
- }
-
- return new_buckets;
- }
-
ICING_RETURN_IF_ERROR(pl_accessor->PrependData(
IntegerIndexData(section_id, document_id, *it)));
}
@@ -1022,6 +1117,9 @@ IntegerIndexStorage::AddKeysIntoBucketAndSplitIfNecessary(
}
mutable_bucket.Get().set_posting_list_identifier(result.id);
+ // We've already verified num_new_data won't exceed the limit of the entire
+ // storage, so it is safe to add to the counter of the bucket.
+ mutable_bucket.Get().set_num_data(num_data_in_bucket + num_new_data);
return std::vector<Bucket>();
}