mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-08 21:55:42 +00:00
Compare commits
23 Commits
incrementa
...
tmp-increm
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79111230ee | ||
|
|
d6957a8e5d | ||
|
|
e83c021755 | ||
|
|
7ec7200378 | ||
|
|
6a577254fa | ||
|
|
fd88c834c3 | ||
|
|
b4005593f4 | ||
|
|
8ee3793259 | ||
|
|
3648abbfd5 | ||
|
|
4d2433de12 | ||
|
|
28cc6df7a3 | ||
|
|
34f4602ae8 | ||
|
|
7a9290aaae | ||
|
|
5d219587b8 | ||
|
|
6e9aa49893 | ||
|
|
6b3a2c7281 | ||
|
|
5908aec6cb | ||
|
|
19f48c15fb | ||
|
|
47b484c07c | ||
|
|
7d5e28b475 | ||
|
|
0648e06aa2 | ||
|
|
33921747b7 | ||
|
|
970a489dcc |
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
|
||||
|
||||
[[package]]
|
||||
name = "benchmarks"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bumpalo",
|
||||
@@ -689,7 +689,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "build-info"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"time",
|
||||
@@ -1664,7 +1664,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "dump"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"big_s",
|
||||
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
|
||||
|
||||
[[package]]
|
||||
name = "file-store"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
@@ -1898,7 +1898,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "filter-parser"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"insta",
|
||||
"nom",
|
||||
@@ -1918,7 +1918,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flatten-serde-json"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"serde_json",
|
||||
@@ -2057,7 +2057,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "fuzzers"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"bumpalo",
|
||||
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
|
||||
|
||||
[[package]]
|
||||
name = "index-scheduler"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -2822,7 +2822,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "json-depth-checker"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"serde_json",
|
||||
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
||||
[[package]]
|
||||
name = "meili-snap"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"insta",
|
||||
"md5",
|
||||
@@ -3450,7 +3450,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"actix-cors",
|
||||
"actix-http",
|
||||
@@ -3540,7 +3540,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch-auth"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"enum-iterator",
|
||||
@@ -3559,7 +3559,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch-types"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"anyhow",
|
||||
@@ -3592,7 +3592,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilitool"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
||||
@@ -3627,7 +3627,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "milli"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "permissive-json-pointer"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"big_s",
|
||||
"serde_json",
|
||||
@@ -6486,7 +6486,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "xtask"
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"build-info",
|
||||
|
||||
@@ -22,7 +22,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.12.0"
|
||||
version = "1.12.1"
|
||||
authors = [
|
||||
"Quentin de Quelen <quentin@dequelen.me>",
|
||||
"Clément Renault <clement@meilisearch.com>",
|
||||
|
||||
@@ -1312,9 +1312,7 @@ impl IndexScheduler {
|
||||
if let DocumentOperation::Add(content_uuid) = operation {
|
||||
let content_file = self.file_store.get_update(*content_uuid)?;
|
||||
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
|
||||
if !mmap.is_empty() {
|
||||
content_files.push(mmap);
|
||||
}
|
||||
content_files.push(mmap);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -291,7 +291,10 @@ impl IndexScheduler {
|
||||
|
||||
debug_assert!(old_task != *task);
|
||||
debug_assert_eq!(old_task.uid, task.uid);
|
||||
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
|
||||
debug_assert!(
|
||||
old_task.batch_uid.is_none() && task.batch_uid.is_some(),
|
||||
"\n==> old: {old_task:?}\n==> new: {task:?}"
|
||||
);
|
||||
|
||||
if old_task.status != task.status {
|
||||
self.update_status(wtxn, old_task.status, |bitmap| {
|
||||
|
||||
@@ -1220,9 +1220,89 @@ async fn replace_document() {
|
||||
#[actix_rt::test]
|
||||
async fn add_no_documents() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
let (_response, code) = index.add_documents(json!([]), None).await;
|
||||
let index = server.index("kefir");
|
||||
let (task, code) = index.add_documents(json!([]), None).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
let task = server.wait_task(task.uid()).await;
|
||||
let task = task.succeeded();
|
||||
snapshot!(task, @r#"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "kefir",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 0,
|
||||
"indexedDocuments": 0
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"#);
|
||||
|
||||
let (task, _code) = index.add_documents(json!([]), Some("kefkef")).await;
|
||||
let task = server.wait_task(task.uid()).await;
|
||||
let task = task.succeeded();
|
||||
snapshot!(task, @r#"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "kefir",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 0,
|
||||
"indexedDocuments": 0
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"#);
|
||||
|
||||
let (task, _code) = index.add_documents(json!([{ "kefkef": 1 }]), None).await;
|
||||
let task = server.wait_task(task.uid()).await;
|
||||
let task = task.succeeded();
|
||||
snapshot!(task, @r#"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": "kefir",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"#);
|
||||
let (documents, _status) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||
snapshot!(documents, @r#"
|
||||
{
|
||||
"results": [
|
||||
{
|
||||
"kefkef": 1
|
||||
}
|
||||
],
|
||||
"offset": 0,
|
||||
"limit": 20,
|
||||
"total": 1
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
||||
@@ -79,22 +79,29 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
|
||||
use std::collections::BTreeSet;
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::ops::Bound;
|
||||
|
||||
use grenad::Merger;
|
||||
use heed::types::{Bytes, DecodeIgnore};
|
||||
use heed::BytesDecode as _;
|
||||
use roaring::RoaringBitmap;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::debug;
|
||||
|
||||
use self::incremental::FacetsUpdateIncremental;
|
||||
use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps};
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
|
||||
use crate::heed_codec::facet::{
|
||||
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec,
|
||||
};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::search::facet::get_highest_level;
|
||||
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
|
||||
use crate::{try_split_array_at, FieldId, Index, Result};
|
||||
|
||||
pub mod bulk;
|
||||
pub mod incremental;
|
||||
pub mod new_incremental;
|
||||
|
||||
/// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases.
|
||||
///
|
||||
@@ -646,3 +653,194 @@ mod comparison_bench {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run sanity checks on the specified fid tree
|
||||
///
|
||||
/// 1. No "orphan" child value, any child value has a parent
|
||||
/// 2. Any docid in the child appears in the parent
|
||||
/// 3. No docid in the parent is missing from all its children
|
||||
/// 4. no group is bigger than max_group_size
|
||||
/// 5. Less than 50% of groups are bigger than group_size
|
||||
/// 6. group size matches the number of children
|
||||
/// 7. max_level is < 255
|
||||
pub(crate) fn sanity_checks(
|
||||
index: &Index,
|
||||
rtxn: &heed::RoTxn,
|
||||
field_id: FieldId,
|
||||
facet_type: FacetType,
|
||||
group_size: usize,
|
||||
_min_level_size: usize, // might add a check on level size later
|
||||
max_group_size: usize,
|
||||
) -> Result<()> {
|
||||
tracing::info!(%field_id, ?facet_type, "performing sanity checks");
|
||||
let database = match facet_type {
|
||||
FacetType::String => {
|
||||
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
}
|
||||
FacetType::Number => {
|
||||
index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
}
|
||||
};
|
||||
|
||||
let leaf_prefix: FacetGroupKey<&[u8]> = FacetGroupKey { field_id, level: 0, left_bound: &[] };
|
||||
|
||||
let leaf_it = database.prefix_iter(rtxn, &leaf_prefix)?;
|
||||
|
||||
let max_level = get_highest_level(rtxn, database, field_id)?;
|
||||
if max_level == u8::MAX {
|
||||
panic!("max_level == 255");
|
||||
}
|
||||
|
||||
for leaf in leaf_it {
|
||||
let (leaf_facet_value, leaf_docids) = leaf?;
|
||||
let mut current_level = 0;
|
||||
|
||||
let mut current_parent_facet_value: Option<FacetGroupKey<&[u8]>> = None;
|
||||
let mut current_parent_docids: Option<crate::heed_codec::facet::FacetGroupValue> = None;
|
||||
loop {
|
||||
current_level += 1;
|
||||
if current_level >= max_level {
|
||||
break;
|
||||
}
|
||||
let parent_key_right_bound = FacetGroupKey {
|
||||
field_id,
|
||||
level: current_level,
|
||||
left_bound: leaf_facet_value.left_bound,
|
||||
};
|
||||
let (parent_facet_value, parent_docids) = database
|
||||
.get_lower_than_or_equal_to(rtxn, &parent_key_right_bound)?
|
||||
.expect("no parent found");
|
||||
if parent_facet_value.level != current_level {
|
||||
panic!(
|
||||
"wrong parent level, found_level={}, expected_level={}",
|
||||
parent_facet_value.level, current_level
|
||||
);
|
||||
}
|
||||
if parent_facet_value.field_id != field_id {
|
||||
panic!("wrong parent fid");
|
||||
}
|
||||
if parent_facet_value.left_bound > leaf_facet_value.left_bound {
|
||||
panic!("wrong parent left bound");
|
||||
}
|
||||
|
||||
if !leaf_docids.bitmap.is_subset(&parent_docids.bitmap) {
|
||||
panic!(
|
||||
"missing docids from leaf in parent, current_level={}, parent={}, child={}, missing={missing:?}, child_len={}, child={:?}",
|
||||
current_level,
|
||||
facet_to_string(parent_facet_value.left_bound, facet_type),
|
||||
facet_to_string(leaf_facet_value.left_bound, facet_type),
|
||||
leaf_docids.bitmap.len(),
|
||||
leaf_docids.bitmap.clone(),
|
||||
missing=leaf_docids.bitmap - parent_docids.bitmap,
|
||||
)
|
||||
}
|
||||
|
||||
if let Some(current_parent_facet_value) = current_parent_facet_value {
|
||||
if current_parent_facet_value.field_id != parent_facet_value.field_id {
|
||||
panic!("wrong parent parent fid");
|
||||
}
|
||||
if current_parent_facet_value.level + 1 != parent_facet_value.level {
|
||||
panic!("wrong parent parent level");
|
||||
}
|
||||
if current_parent_facet_value.left_bound < parent_facet_value.left_bound {
|
||||
panic!("wrong parent parent left bound");
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(current_parent_docids) = current_parent_docids {
|
||||
if !current_parent_docids.bitmap.is_subset(&parent_docids.bitmap) {
|
||||
panic!("missing docids from intermediate node in parent, parent_level={}, parent={}, intermediate={}, missing={missing:?}, intermediate={:?}",
|
||||
parent_facet_value.level,
|
||||
facet_to_string(parent_facet_value.left_bound, facet_type),
|
||||
facet_to_string(current_parent_facet_value.unwrap().left_bound, facet_type),
|
||||
current_parent_docids.bitmap.clone(),
|
||||
missing=current_parent_docids.bitmap - parent_docids.bitmap,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
current_parent_facet_value = Some(parent_facet_value);
|
||||
current_parent_docids = Some(parent_docids);
|
||||
}
|
||||
}
|
||||
tracing::info!(%field_id, ?facet_type, "checked all leaves");
|
||||
|
||||
let mut current_level = max_level;
|
||||
let mut greater_than_group = 0usize;
|
||||
let mut total = 0usize;
|
||||
loop {
|
||||
if current_level == 0 {
|
||||
break;
|
||||
}
|
||||
let child_level = current_level - 1;
|
||||
tracing::info!(%field_id, ?facet_type, %current_level, "checked groups for level");
|
||||
let level_groups_prefix: FacetGroupKey<&[u8]> =
|
||||
FacetGroupKey { field_id, level: current_level, left_bound: &[] };
|
||||
let mut level_groups_it = database.prefix_iter(rtxn, &level_groups_prefix)?.peekable();
|
||||
|
||||
'group_it: loop {
|
||||
let Some(group) = level_groups_it.next() else { break 'group_it };
|
||||
|
||||
let (group_facet_value, group_docids) = group?;
|
||||
let child_left_bound = group_facet_value.left_bound.to_owned();
|
||||
let mut expected_docids = RoaringBitmap::new();
|
||||
let mut expected_size = 0usize;
|
||||
let right_bound = level_groups_it
|
||||
.peek()
|
||||
.and_then(|res| res.as_ref().ok())
|
||||
.map(|(key, _)| key.left_bound);
|
||||
let child_left_bound = FacetGroupKey {
|
||||
field_id,
|
||||
level: child_level,
|
||||
left_bound: child_left_bound.as_slice(),
|
||||
};
|
||||
let child_left_bound = Bound::Included(&child_left_bound);
|
||||
let child_right_bound;
|
||||
let child_right_bound = if let Some(right_bound) = right_bound {
|
||||
child_right_bound =
|
||||
FacetGroupKey { field_id, level: child_level, left_bound: right_bound };
|
||||
Bound::Excluded(&child_right_bound)
|
||||
} else {
|
||||
Bound::Unbounded
|
||||
};
|
||||
let children = database.range(rtxn, &(child_left_bound, child_right_bound))?;
|
||||
for child in children {
|
||||
let (child_facet_value, child_docids) = child?;
|
||||
if child_facet_value.field_id != field_id {
|
||||
break;
|
||||
}
|
||||
if child_facet_value.level != child_level {
|
||||
break;
|
||||
}
|
||||
expected_size += 1;
|
||||
expected_docids |= &child_docids.bitmap;
|
||||
}
|
||||
assert_eq!(expected_size, group_docids.size as usize);
|
||||
assert!(expected_size <= max_group_size);
|
||||
assert_eq!(expected_docids, group_docids.bitmap);
|
||||
total += 1;
|
||||
if expected_size > group_size {
|
||||
greater_than_group += 1;
|
||||
}
|
||||
}
|
||||
|
||||
current_level -= 1;
|
||||
}
|
||||
if greater_than_group * 2 > total {
|
||||
panic!("too many groups have a size > group_size");
|
||||
}
|
||||
|
||||
tracing::info!("sanity checks OK");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn facet_to_string(facet_value: &[u8], facet_type: FacetType) -> String {
|
||||
match facet_type {
|
||||
FacetType::String => bstr::BStr::new(facet_value).to_string(),
|
||||
FacetType::Number => match OrderedF64Codec::bytes_decode(facet_value) {
|
||||
Ok(value) => value.to_string(),
|
||||
Err(e) => format!("error: {e} (bytes: {facet_value:?}"),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
498
crates/milli/src/update/facet/new_incremental.rs
Normal file
498
crates/milli/src/update/facet/new_incremental.rs
Normal file
@@ -0,0 +1,498 @@
|
||||
use std::ops::Bound;
|
||||
|
||||
use heed::types::{Bytes, DecodeIgnore};
|
||||
use heed::{BytesDecode as _, Database, RwTxn};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{
|
||||
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
|
||||
};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::search::facet::get_highest_level;
|
||||
use crate::update::valid_facet_value;
|
||||
use crate::{FieldId, Index, Result};
|
||||
|
||||
pub struct FacetsUpdateIncremental {
|
||||
inner: FacetsUpdateIncrementalInner,
|
||||
delta_data: Vec<FacetFieldIdChange>,
|
||||
}
|
||||
|
||||
struct FacetsUpdateIncrementalInner {
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
|
||||
field_id: FieldId,
|
||||
group_size: u8,
|
||||
min_level_size: u8,
|
||||
max_group_size: u8,
|
||||
}
|
||||
|
||||
impl FacetsUpdateIncremental {
|
||||
pub fn new(
|
||||
index: &Index,
|
||||
facet_type: FacetType,
|
||||
field_id: FieldId,
|
||||
delta_data: Vec<FacetFieldIdChange>,
|
||||
group_size: u8,
|
||||
min_level_size: u8,
|
||||
max_group_size: u8,
|
||||
) -> Self {
|
||||
FacetsUpdateIncremental {
|
||||
inner: FacetsUpdateIncrementalInner {
|
||||
db: match facet_type {
|
||||
FacetType::String => index
|
||||
.facet_id_string_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
|
||||
FacetType::Number => index
|
||||
.facet_id_f64_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
|
||||
},
|
||||
field_id,
|
||||
group_size,
|
||||
min_level_size,
|
||||
max_group_size,
|
||||
},
|
||||
|
||||
delta_data,
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")]
|
||||
pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> {
|
||||
if self.delta_data.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
self.delta_data.sort_unstable_by(
|
||||
|FacetFieldIdChange { facet_value: left, .. },
|
||||
FacetFieldIdChange { facet_value: right, .. }| {
|
||||
left.cmp(right)
|
||||
// sort in **reverse** lexicographic order
|
||||
.reverse()
|
||||
},
|
||||
);
|
||||
|
||||
self.inner.find_changed_parents(wtxn, self.delta_data)?;
|
||||
|
||||
self.inner.add_or_delete_level(wtxn)
|
||||
}
|
||||
}
|
||||
|
||||
impl FacetsUpdateIncrementalInner {
|
||||
/// WARNING: `changed_children` must be sorted in **reverse** lexicographic order.
|
||||
fn find_changed_parents(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
mut changed_children: Vec<FacetFieldIdChange>,
|
||||
) -> Result<()> {
|
||||
let mut changed_parents = vec![];
|
||||
for child_level in 0u8..u8::MAX {
|
||||
// child_level < u8::MAX by construction
|
||||
let parent_level = child_level + 1;
|
||||
let parent_level_left_bound: FacetGroupKey<&[u8]> =
|
||||
FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] };
|
||||
|
||||
let mut last_parent: Option<Box<[u8]>> = None;
|
||||
let mut child_it = changed_children
|
||||
// drain all changed children
|
||||
.drain(..)
|
||||
// keep only children whose value is valid in the LMDB sense
|
||||
.filter(|child| valid_facet_value(&child.facet_value));
|
||||
// `while let` rather than `for` because we advance `child_it` inside of the loop
|
||||
'current_level: while let Some(child) = child_it.next() {
|
||||
if let Some(last_parent) = &last_parent {
|
||||
if &child.facet_value >= last_parent {
|
||||
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||
continue 'current_level;
|
||||
}
|
||||
}
|
||||
|
||||
// need to find a new parent
|
||||
let parent_key_prefix = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: parent_level,
|
||||
left_bound: &*child.facet_value,
|
||||
};
|
||||
|
||||
let parent = self
|
||||
.db
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.rev_range(
|
||||
wtxn,
|
||||
&(
|
||||
Bound::Excluded(&parent_level_left_bound),
|
||||
Bound::Included(&parent_key_prefix),
|
||||
),
|
||||
)?
|
||||
.next();
|
||||
|
||||
match parent {
|
||||
Some(Ok((parent_key, _parent_value))) => {
|
||||
// found parent, cache it for next keys
|
||||
last_parent = Some(parent_key.left_bound.to_owned().into_boxed_slice());
|
||||
|
||||
// add to modified list for parent level
|
||||
changed_parents.push(FacetFieldIdChange {
|
||||
facet_value: parent_key.left_bound.to_owned().into_boxed_slice(),
|
||||
});
|
||||
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||
}
|
||||
Some(Err(err)) => return Err(err.into()),
|
||||
None => {
|
||||
// no parent for that key
|
||||
let mut parent_it = self
|
||||
.db
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.prefix_iter_mut(wtxn, &parent_level_left_bound)?;
|
||||
match parent_it.next() {
|
||||
// 1. left of the current left bound, or
|
||||
Some(Ok((first_key, _first_value))) => {
|
||||
// make sure we don't spill on the neighboring fid (level also included defensively)
|
||||
if first_key.field_id != self.field_id
|
||||
|| first_key.level != parent_level
|
||||
{
|
||||
// max level reached, exit
|
||||
drop(parent_it);
|
||||
self.compute_parent_group(
|
||||
wtxn,
|
||||
child_level,
|
||||
child.facet_value,
|
||||
)?;
|
||||
for child in child_it.by_ref() {
|
||||
self.compute_parent_group(
|
||||
wtxn,
|
||||
child_level,
|
||||
child.facet_value,
|
||||
)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
// remove old left bound
|
||||
unsafe { parent_it.del_current()? };
|
||||
drop(parent_it);
|
||||
changed_parents.push(FacetFieldIdChange {
|
||||
facet_value: child.facet_value.clone(),
|
||||
});
|
||||
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||
// pop all elements in order to visit the new left bound
|
||||
let new_left_bound =
|
||||
&mut changed_parents.last_mut().unwrap().facet_value;
|
||||
for child in child_it.by_ref() {
|
||||
new_left_bound.clone_from(&child.facet_value);
|
||||
|
||||
self.compute_parent_group(
|
||||
wtxn,
|
||||
child_level,
|
||||
child.facet_value,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
Some(Err(err)) => return Err(err.into()),
|
||||
// 2. max level reached, exit
|
||||
None => {
|
||||
drop(parent_it);
|
||||
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
|
||||
for child in child_it.by_ref() {
|
||||
self.compute_parent_group(
|
||||
wtxn,
|
||||
child_level,
|
||||
child.facet_value,
|
||||
)?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if changed_parents.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
drop(child_it);
|
||||
std::mem::swap(&mut changed_children, &mut changed_parents);
|
||||
// changed_parents is now empty because changed_children was emptied by the drain
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_parent_group(
|
||||
&self,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
parent_level: u8,
|
||||
parent_left_bound: Box<[u8]>,
|
||||
) -> Result<()> {
|
||||
let mut range_left_bound: Vec<u8> = parent_left_bound.into();
|
||||
if parent_level == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
let child_level = parent_level - 1;
|
||||
|
||||
let parent_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: parent_level,
|
||||
left_bound: &*range_left_bound,
|
||||
};
|
||||
let child_right_bound = self
|
||||
.db
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.get_greater_than(wtxn, &parent_key)?
|
||||
.and_then(
|
||||
|(
|
||||
FacetGroupKey {
|
||||
level: right_level,
|
||||
field_id: right_fid,
|
||||
left_bound: right_bound,
|
||||
},
|
||||
_,
|
||||
)| {
|
||||
if parent_level != right_level || self.field_id != right_fid {
|
||||
// there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore
|
||||
return None;
|
||||
}
|
||||
Some(right_bound.to_owned())
|
||||
},
|
||||
);
|
||||
let child_right_bound = match &child_right_bound {
|
||||
Some(right_bound) => Bound::Excluded(FacetGroupKey {
|
||||
left_bound: right_bound.as_slice(),
|
||||
field_id: self.field_id,
|
||||
level: child_level,
|
||||
}),
|
||||
None => Bound::Unbounded,
|
||||
};
|
||||
|
||||
let child_left_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: child_level,
|
||||
left_bound: &*range_left_bound,
|
||||
};
|
||||
let mut child_left_bound = Bound::Included(child_left_key);
|
||||
|
||||
loop {
|
||||
// do a first pass on the range to find the number of children
|
||||
let child_count = self
|
||||
.db
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.range(wtxn, &(child_left_bound, child_right_bound))?
|
||||
.take(self.max_group_size as usize * 2)
|
||||
.count();
|
||||
let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?;
|
||||
|
||||
// pick the right group_size depending on the number of children
|
||||
let group_size = if child_count >= self.max_group_size as usize * 2 {
|
||||
// more than twice the max_group_size => there will be space for at least 2 groups of max_group_size
|
||||
self.max_group_size as usize
|
||||
} else if child_count >= self.group_size as usize {
|
||||
// size in [group_size, max_group_size * 2[
|
||||
// divided by 2 it is between [group_size / 2, max_group_size[
|
||||
// this ensures that the tree is balanced
|
||||
child_count / 2
|
||||
} else {
|
||||
// take everything
|
||||
child_count
|
||||
};
|
||||
|
||||
let res: Result<_> = child_it
|
||||
.by_ref()
|
||||
.take(group_size)
|
||||
// stop if we go to the next level or field id
|
||||
.take_while(|res| match res {
|
||||
Ok((child_key, _)) => {
|
||||
child_key.field_id == self.field_id && child_key.level == child_level
|
||||
}
|
||||
Err(_) => true,
|
||||
})
|
||||
.try_fold(
|
||||
(None, FacetGroupValue { size: 0, bitmap: Default::default() }),
|
||||
|(bounds, mut group_value), child_res| {
|
||||
let (child_key, child_value) = child_res?;
|
||||
let bounds = match bounds {
|
||||
Some((left_bound, _)) => Some((left_bound, child_key.left_bound)),
|
||||
None => Some((child_key.left_bound, child_key.left_bound)),
|
||||
};
|
||||
// max_group_size <= u8::MAX
|
||||
group_value.size += 1;
|
||||
group_value.bitmap |= &child_value.bitmap;
|
||||
Ok((bounds, group_value))
|
||||
},
|
||||
);
|
||||
|
||||
let (bounds, group_value) = res?;
|
||||
|
||||
let Some((group_left_bound, right_bound)) = bounds else {
|
||||
let update_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: parent_level,
|
||||
left_bound: &*range_left_bound,
|
||||
};
|
||||
drop(child_it);
|
||||
if let Bound::Included(_) = child_left_bound {
|
||||
self.db.delete(wtxn, &update_key)?;
|
||||
}
|
||||
|
||||
break;
|
||||
};
|
||||
|
||||
drop(child_it);
|
||||
let current_left_bound = group_left_bound.to_owned();
|
||||
|
||||
let delete_old_bound = match child_left_bound {
|
||||
Bound::Included(bound) => {
|
||||
if bound.left_bound != current_left_bound {
|
||||
Some(range_left_bound.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
|
||||
range_left_bound.clear();
|
||||
range_left_bound.extend_from_slice(right_bound);
|
||||
let child_left_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: child_level,
|
||||
left_bound: range_left_bound.as_slice(),
|
||||
};
|
||||
child_left_bound = Bound::Excluded(child_left_key);
|
||||
|
||||
if let Some(old_bound) = delete_old_bound {
|
||||
let update_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: parent_level,
|
||||
left_bound: old_bound.as_slice(),
|
||||
};
|
||||
self.db.delete(wtxn, &update_key)?;
|
||||
}
|
||||
|
||||
let update_key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: parent_level,
|
||||
left_bound: current_left_bound.as_slice(),
|
||||
};
|
||||
if group_value.bitmap.is_empty() {
|
||||
self.db.delete(wtxn, &update_key)?;
|
||||
} else {
|
||||
self.db.put(wtxn, &update_key, &group_value)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check whether the highest level has exceeded `min_level_size` * `self.group_size`.
|
||||
/// If it has, we must build an addition level above it.
|
||||
/// Then check whether the highest level is under `min_level_size`.
|
||||
/// If it has, we must remove the complete level.
|
||||
pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> {
|
||||
let highest_level = get_highest_level(txn, self.db, self.field_id)?;
|
||||
let mut highest_level_prefix = vec![];
|
||||
highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes());
|
||||
highest_level_prefix.push(highest_level);
|
||||
|
||||
let size_highest_level =
|
||||
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, &highest_level_prefix)?.count();
|
||||
|
||||
if size_highest_level >= self.group_size as usize * self.min_level_size as usize {
|
||||
self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level)
|
||||
} else if size_highest_level < self.min_level_size as usize && highest_level != 0 {
|
||||
self.delete_level(txn, &highest_level_prefix)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Delete a level.
|
||||
fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> {
|
||||
let mut to_delete = vec![];
|
||||
let mut iter =
|
||||
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, highest_level_prefix)?;
|
||||
for el in iter.by_ref() {
|
||||
let (k, _) = el?;
|
||||
to_delete.push(
|
||||
FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(k)
|
||||
.map_err(heed::Error::Encoding)?
|
||||
.into_owned(),
|
||||
);
|
||||
}
|
||||
drop(iter);
|
||||
for k in to_delete {
|
||||
self.db.delete(txn, &k.as_ref())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build an additional level for the field id.
|
||||
fn add_level(
|
||||
&self,
|
||||
txn: &mut RwTxn<'_>,
|
||||
highest_level: u8,
|
||||
highest_level_prefix: &[u8],
|
||||
size_highest_level: usize,
|
||||
) -> Result<()> {
|
||||
let mut groups_iter = self
|
||||
.db
|
||||
.remap_types::<Bytes, FacetGroupValueCodec>()
|
||||
.prefix_iter(txn, highest_level_prefix)?;
|
||||
|
||||
let nbr_new_groups = size_highest_level / self.group_size as usize;
|
||||
let nbr_leftover_elements = size_highest_level % self.group_size as usize;
|
||||
|
||||
let mut to_add = vec![];
|
||||
for _ in 0..nbr_new_groups {
|
||||
let mut first_key = None;
|
||||
let mut values = RoaringBitmap::new();
|
||||
for _ in 0..self.group_size {
|
||||
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
|
||||
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
|
||||
.map_err(heed::Error::Encoding)?;
|
||||
|
||||
if first_key.is_none() {
|
||||
first_key = Some(key_i);
|
||||
}
|
||||
values |= value_i.bitmap;
|
||||
}
|
||||
let key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: highest_level + 1,
|
||||
left_bound: first_key.unwrap().left_bound,
|
||||
};
|
||||
let value = FacetGroupValue { size: self.group_size, bitmap: values };
|
||||
to_add.push((key.into_owned(), value));
|
||||
}
|
||||
// now we add the rest of the level, in case its size is > group_size * min_level_size
|
||||
// this can indeed happen if the min_level_size parameter changes between two calls to `insert`
|
||||
if nbr_leftover_elements > 0 {
|
||||
let mut first_key = None;
|
||||
let mut values = RoaringBitmap::new();
|
||||
for _ in 0..nbr_leftover_elements {
|
||||
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
|
||||
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
|
||||
.map_err(heed::Error::Encoding)?;
|
||||
|
||||
if first_key.is_none() {
|
||||
first_key = Some(key_i);
|
||||
}
|
||||
values |= value_i.bitmap;
|
||||
}
|
||||
let key = FacetGroupKey {
|
||||
field_id: self.field_id,
|
||||
level: highest_level + 1,
|
||||
left_bound: first_key.unwrap().left_bound,
|
||||
};
|
||||
// Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size`
|
||||
// when it is created above.
|
||||
let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values };
|
||||
to_add.push((key.into_owned(), value));
|
||||
}
|
||||
|
||||
drop(groups_iter);
|
||||
for (key, value) in to_add {
|
||||
self.db.put(txn, &key.as_ref(), &value)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FacetFieldIdChange {
|
||||
pub facet_value: Box<[u8]>,
|
||||
}
|
||||
@@ -10,10 +10,14 @@ use fst::{IntoStreamer, Streamer};
|
||||
pub use grenad_helpers::*;
|
||||
pub use merge_functions::*;
|
||||
|
||||
use crate::MAX_WORD_LENGTH;
|
||||
use crate::MAX_LMDB_KEY_LENGTH;
|
||||
|
||||
pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool {
|
||||
key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty()
|
||||
key.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !key.as_ref().is_empty()
|
||||
}
|
||||
|
||||
pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool {
|
||||
facet_value.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !facet_value.as_ref().is_empty()
|
||||
}
|
||||
|
||||
/// Divides one slice into two at an index, returns `None` if mid is out of bounds.
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
---
|
||||
source: milli/src/update/index_documents/mod.rs
|
||||
source: crates/milli/src/update/index_documents/mod.rs
|
||||
---
|
||||
3 0 48.9021 1 [19, ]
|
||||
3 0 49.9314 1 [17, ]
|
||||
@@ -15,6 +15,11 @@ source: milli/src/update/index_documents/mod.rs
|
||||
3 0 50.7453 1 [7, ]
|
||||
3 0 50.8466 1 [10, ]
|
||||
3 0 51.0537 1 [9, ]
|
||||
3 1 48.9021 2 [17, 19, ]
|
||||
3 1 50.1793 3 [13, 14, 15, ]
|
||||
3 1 50.4502 4 [0, 3, 8, 12, ]
|
||||
3 1 50.6312 2 [1, 2, ]
|
||||
3 1 50.7453 3 [7, 9, 10, ]
|
||||
4 0 2.271 1 [17, ]
|
||||
4 0 2.3708 1 [19, ]
|
||||
4 0 2.7637 1 [14, ]
|
||||
@@ -28,4 +33,3 @@ source: milli/src/update/index_documents/mod.rs
|
||||
4 0 3.6957 1 [9, ]
|
||||
4 0 3.9623 1 [12, ]
|
||||
4 0 4.337 1 [10, ]
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH};
|
||||
pub struct FacetedExtractorData<'a, 'b> {
|
||||
attributes_to_extract: &'a [&'a str],
|
||||
sender: &'a FieldIdDocidFacetSender<'a, 'b>,
|
||||
grenad_parameters: &'a GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
buckets: usize,
|
||||
}
|
||||
|
||||
@@ -374,6 +374,7 @@ fn truncate_str(s: &str) -> &str {
|
||||
impl FacetedDocidsExtractor {
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")]
|
||||
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||
grenad_parameters: GrenadParameters,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
@@ -397,7 +398,7 @@ impl FacetedDocidsExtractor {
|
||||
|
||||
let extractor = FacetedExtractorData {
|
||||
attributes_to_extract: &attributes_to_extract,
|
||||
grenad_parameters: indexing_context.grenad_parameters,
|
||||
grenad_parameters,
|
||||
buckets: rayon::current_num_threads(),
|
||||
sender,
|
||||
};
|
||||
|
||||
@@ -18,10 +18,12 @@ pub use vectors::EmbeddingExtractor;
|
||||
use super::indexer::document_changes::{DocumentChanges, IndexingContext};
|
||||
use super::steps::IndexingStep;
|
||||
use super::thread_local::{FullySend, ThreadLocal};
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::Result;
|
||||
|
||||
pub trait DocidsExtractor {
|
||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||
grenad_parameters: GrenadParameters,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
|
||||
@@ -208,7 +208,7 @@ impl<'extractor> WordDocidsCaches<'extractor> {
|
||||
|
||||
pub struct WordDocidsExtractorData<'a> {
|
||||
tokenizer: &'a DocumentTokenizer<'a>,
|
||||
grenad_parameters: &'a GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
buckets: usize,
|
||||
}
|
||||
|
||||
@@ -240,6 +240,7 @@ pub struct WordDocidsExtractors;
|
||||
|
||||
impl WordDocidsExtractors {
|
||||
pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||
grenad_parameters: GrenadParameters,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
@@ -287,7 +288,7 @@ impl WordDocidsExtractors {
|
||||
|
||||
let extractor = WordDocidsExtractorData {
|
||||
tokenizer: &document_tokenizer,
|
||||
grenad_parameters: indexing_context.grenad_parameters,
|
||||
grenad_parameters,
|
||||
buckets: rayon::current_num_threads(),
|
||||
};
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE};
|
||||
|
||||
pub struct SearchableExtractorData<'a, EX: SearchableExtractor> {
|
||||
tokenizer: &'a DocumentTokenizer<'a>,
|
||||
grenad_parameters: &'a GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
buckets: usize,
|
||||
_ex: PhantomData<EX>,
|
||||
}
|
||||
@@ -57,6 +57,7 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor>
|
||||
|
||||
pub trait SearchableExtractor: Sized + Sync {
|
||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||
grenad_parameters: GrenadParameters,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
@@ -95,7 +96,7 @@ pub trait SearchableExtractor: Sized + Sync {
|
||||
|
||||
let extractor_data: SearchableExtractorData<Self> = SearchableExtractorData {
|
||||
tokenizer: &document_tokenizer,
|
||||
grenad_parameters: indexing_context.grenad_parameters,
|
||||
grenad_parameters,
|
||||
buckets: rayon::current_num_threads(),
|
||||
_ex: PhantomData,
|
||||
};
|
||||
@@ -133,6 +134,7 @@ pub trait SearchableExtractor: Sized + Sync {
|
||||
|
||||
impl<T: SearchableExtractor> DocidsExtractor for T {
|
||||
fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>(
|
||||
grenad_parameters: GrenadParameters,
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
@@ -141,6 +143,12 @@ impl<T: SearchableExtractor> DocidsExtractor for T {
|
||||
where
|
||||
MSP: Fn() -> bool + Sync,
|
||||
{
|
||||
Self::run_extraction(document_changes, indexing_context, extractor_allocs, step)
|
||||
Self::run_extraction(
|
||||
grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
step,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,187 +0,0 @@
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::RwTxn;
|
||||
use itertools::{merge_join_by, EitherOrBoth};
|
||||
|
||||
use super::document_changes::IndexingContext;
|
||||
use crate::facet::FacetType;
|
||||
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
|
||||
use crate::update::del_add::DelAdd;
|
||||
use crate::update::new::facet_search_builder::FacetSearchBuilder;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||
use crate::update::new::words_prefix_docids::{
|
||||
compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids,
|
||||
compute_word_prefix_position_docids,
|
||||
};
|
||||
use crate::update::new::FacetFieldIdsDelta;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::{GlobalFieldsIdsMap, Index, Result};
|
||||
|
||||
pub(super) fn postprocess<MSP>(
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
global_fields_ids_map: GlobalFieldsIdsMap<'_>,
|
||||
facet_field_ids_delta: FacetFieldIdsDelta,
|
||||
) -> Result<()>
|
||||
where
|
||||
MSP: Fn() -> bool + Sync,
|
||||
{
|
||||
let index = indexing_context.index;
|
||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
|
||||
if index.facet_search(wtxn)? {
|
||||
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
||||
}
|
||||
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
|
||||
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
||||
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?;
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
fn compute_prefix_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
prefix_delta: PrefixDelta,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> Result<()> {
|
||||
let PrefixDelta { modified, deleted } = prefix_delta;
|
||||
// Compute word prefix docids
|
||||
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute exact word prefix docids
|
||||
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute word prefix fid docids
|
||||
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute word prefix position docids
|
||||
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
|
||||
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let words_fst = index.words_fst(&rtxn)?;
|
||||
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
||||
let prefix_settings = index.prefix_settings(&rtxn)?;
|
||||
word_fst_builder.with_prefix_settings(prefix_settings);
|
||||
|
||||
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>();
|
||||
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>();
|
||||
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
|
||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||
}) {
|
||||
match eob {
|
||||
EitherOrBoth::Both(lhs, rhs) => {
|
||||
let (word, lhs_bytes) = lhs?;
|
||||
let (_, rhs_bytes) = rhs?;
|
||||
if lhs_bytes != rhs_bytes {
|
||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Left(result) => {
|
||||
let (word, _) = result?;
|
||||
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
|
||||
}
|
||||
EitherOrBoth::Right(result) => {
|
||||
let (word, _) = result?;
|
||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
||||
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
||||
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
||||
index.main.remap_types::<Str, Bytes>().put(
|
||||
wtxn,
|
||||
WORDS_PREFIXES_FST_KEY,
|
||||
&prefixes_fst_mmap,
|
||||
)?;
|
||||
Ok(Some(prefix_delta))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
|
||||
fn compute_facet_search_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
global_fields_ids_map: GlobalFieldsIdsMap,
|
||||
) -> Result<()> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
||||
let mut facet_search_builder = FacetSearchBuilder::new(
|
||||
global_fields_ids_map,
|
||||
localized_attributes_rules.unwrap_or_default(),
|
||||
);
|
||||
|
||||
let previous_facet_id_string_docids = index
|
||||
.facet_id_string_docids
|
||||
.iter(&rtxn)?
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||
let current_facet_id_string_docids = index
|
||||
.facet_id_string_docids
|
||||
.iter(wtxn)?
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||
for eob in merge_join_by(
|
||||
previous_facet_id_string_docids,
|
||||
current_facet_id_string_docids,
|
||||
|lhs, rhs| match (lhs, rhs) {
|
||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||
},
|
||||
) {
|
||||
match eob {
|
||||
EitherOrBoth::Both(lhs, rhs) => {
|
||||
let (_, _) = lhs?;
|
||||
let (_, _) = rhs?;
|
||||
}
|
||||
EitherOrBoth::Left(result) => {
|
||||
let (key, _) = result?;
|
||||
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
||||
}
|
||||
EitherOrBoth::Right(result) => {
|
||||
let (key, _) = result?;
|
||||
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
||||
fn compute_facet_level_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
facet_field_ids_delta: FacetFieldIdsDelta,
|
||||
) -> Result<()> {
|
||||
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
|
||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
||||
let _entered = span.enter();
|
||||
FacetsUpdateBulk::new_not_updating_level_0(
|
||||
index,
|
||||
modified_facet_string_ids,
|
||||
FacetType::String,
|
||||
)
|
||||
.execute(wtxn)?;
|
||||
}
|
||||
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
|
||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
||||
let _entered = span.enter();
|
||||
FacetsUpdateBulk::new_not_updating_level_0(
|
||||
index,
|
||||
modified_facet_number_ids,
|
||||
FacetType::Number,
|
||||
)
|
||||
.execute(wtxn)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -12,7 +12,6 @@ use crate::progress::{AtomicDocumentStep, Progress};
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||
|
||||
pub struct DocumentChangeContext<
|
||||
@@ -146,7 +145,6 @@ pub struct IndexingContext<
|
||||
pub fields_ids_map_store: &'indexer ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
||||
pub must_stop_processing: &'indexer MSP,
|
||||
pub progress: &'indexer Progress,
|
||||
pub grenad_parameters: &'indexer GrenadParameters,
|
||||
}
|
||||
|
||||
impl<
|
||||
@@ -209,7 +207,6 @@ pub fn extract<
|
||||
fields_ids_map_store,
|
||||
must_stop_processing,
|
||||
progress,
|
||||
grenad_parameters: _,
|
||||
}: IndexingContext<'fid, 'indexer, 'index, MSP>,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
datastore: &'data ThreadLocal<EX::Data>,
|
||||
|
||||
@@ -166,7 +166,6 @@ mod test {
|
||||
fields_ids_map_store: &fields_ids_map_store,
|
||||
must_stop_processing: &(|| false),
|
||||
progress: &Progress::default(),
|
||||
grenad_parameters: &Default::default(),
|
||||
};
|
||||
|
||||
for _ in 0..3 {
|
||||
|
||||
@@ -13,7 +13,7 @@ use serde_json::Deserializer;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
use super::document_changes::{DocumentChangeContext, DocumentChanges};
|
||||
use super::guess_primary_key::retrieve_or_guess_primary_key;
|
||||
use super::retrieve_or_guess_primary_key;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::progress::{AtomicPayloadStep, Progress};
|
||||
use crate::update::new::document::Versions;
|
||||
@@ -252,6 +252,24 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
||||
previous_offset = iter.byte_offset();
|
||||
}
|
||||
|
||||
if payload.is_empty() {
|
||||
let result = retrieve_or_guess_primary_key(
|
||||
rtxn,
|
||||
index,
|
||||
new_fields_ids_map,
|
||||
primary_key_from_op,
|
||||
None,
|
||||
);
|
||||
match result {
|
||||
Ok(Ok((pk, _))) => {
|
||||
primary_key.get_or_insert(pk);
|
||||
}
|
||||
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
|
||||
Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
|
||||
Err(error) => return Err(error),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(new_docids_version_offsets)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,309 +0,0 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
use bumpalo::Bump;
|
||||
use roaring::RoaringBitmap;
|
||||
use tracing::Span;
|
||||
|
||||
use super::super::channel::*;
|
||||
use super::super::extract::*;
|
||||
use super::super::steps::IndexingStep;
|
||||
use super::super::thread_local::{FullySend, ThreadLocal};
|
||||
use super::super::FacetFieldIdsDelta;
|
||||
use super::document_changes::{extract, DocumentChanges, IndexingContext};
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::new::extract::EmbeddingExtractor;
|
||||
use crate::update::new::merger::merge_and_send_rtree;
|
||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
|
||||
document_changes: &DC,
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
indexer_span: Span,
|
||||
extractor_sender: ExtractorBbqueueSender,
|
||||
embedders: &EmbeddingConfigs,
|
||||
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||
finished_extraction: &AtomicBool,
|
||||
field_distribution: &mut BTreeMap<String, u64>,
|
||||
mut index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||
document_ids: &mut RoaringBitmap,
|
||||
) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)>
|
||||
where
|
||||
DC: DocumentChanges<'pl>,
|
||||
MSP: Fn() -> bool + Sync,
|
||||
{
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||
let _entered = span.enter();
|
||||
|
||||
let index = indexing_context.index;
|
||||
let rtxn = index.read_txn()?;
|
||||
|
||||
// document but we need to create a function that collects and compresses documents.
|
||||
let document_sender = extractor_sender.documents();
|
||||
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
|
||||
let _entered = span.enter();
|
||||
extract(
|
||||
document_changes,
|
||||
&document_extractor,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::ExtractingDocuments,
|
||||
)?;
|
||||
}
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
|
||||
let _entered = span.enter();
|
||||
for document_extractor_data in datastore {
|
||||
let document_extractor_data = document_extractor_data.0.into_inner();
|
||||
for (field, delta) in document_extractor_data.field_distribution_delta {
|
||||
let current = field_distribution.entry(field).or_default();
|
||||
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
|
||||
*current = current.saturating_add_signed(delta);
|
||||
}
|
||||
document_extractor_data.docids_delta.apply_to(document_ids);
|
||||
}
|
||||
|
||||
field_distribution.retain(|_, v| *v != 0);
|
||||
}
|
||||
|
||||
let facet_field_ids_delta;
|
||||
|
||||
{
|
||||
let caches = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
|
||||
let _entered = span.enter();
|
||||
|
||||
FacetedDocidsExtractor::run_extraction(
|
||||
document_changes,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
&extractor_sender.field_id_docid_facet_sender(),
|
||||
IndexingStep::ExtractingFacets,
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
|
||||
let _entered = span.enter();
|
||||
|
||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||
caches,
|
||||
FacetDatabases::new(index),
|
||||
index,
|
||||
extractor_sender.facet_docids(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let WordDocidsCaches {
|
||||
word_docids,
|
||||
word_fid_docids,
|
||||
exact_word_docids,
|
||||
word_position_docids,
|
||||
fid_word_count_docids,
|
||||
} = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
WordDocidsExtractors::run_extraction(
|
||||
document_changes,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
IndexingStep::ExtractingWords,
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_docids,
|
||||
index.word_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_fid_docids,
|
||||
index.word_fid_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordFidDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
exact_word_docids,
|
||||
index.exact_word_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<ExactWordDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_position_docids,
|
||||
index.word_position_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordPositionDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
fid_word_count_docids,
|
||||
index.field_id_word_count_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<FidWordCountDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// run the proximity extraction only if the precision is by word
|
||||
// this works only if the settings didn't change during this transaction.
|
||||
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
|
||||
if proximity_precision == ProximityPrecision::ByWord {
|
||||
let caches = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
||||
document_changes,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
IndexingStep::ExtractingWordProximity,
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
merge_and_send_docids(
|
||||
caches,
|
||||
index.word_pair_proximity_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordPairProximityDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
'vectors: {
|
||||
if index_embeddings.is_empty() {
|
||||
break 'vectors;
|
||||
}
|
||||
|
||||
let embedding_sender = extractor_sender.embeddings();
|
||||
let extractor = EmbeddingExtractor::new(
|
||||
embedders,
|
||||
embedding_sender,
|
||||
field_distribution,
|
||||
request_threads(),
|
||||
);
|
||||
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
||||
let _entered = span.enter();
|
||||
|
||||
extract(
|
||||
document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::ExtractingEmbeddings,
|
||||
)?;
|
||||
}
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
|
||||
let _entered = span.enter();
|
||||
|
||||
for config in &mut index_embeddings {
|
||||
'data: for data in datastore.iter_mut() {
|
||||
let data = &mut data.get_mut().0;
|
||||
let Some(deladd) = data.remove(&config.name) else {
|
||||
continue 'data;
|
||||
};
|
||||
deladd.apply_to(&mut config.user_provided);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
'geo: {
|
||||
let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)?
|
||||
else {
|
||||
break 'geo;
|
||||
};
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
|
||||
let _entered = span.enter();
|
||||
|
||||
extract(
|
||||
document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::WritingGeoPoints,
|
||||
)?;
|
||||
}
|
||||
|
||||
merge_and_send_rtree(
|
||||
datastore,
|
||||
&rtxn,
|
||||
index,
|
||||
extractor_sender.geo(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);
|
||||
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
Result::Ok((facet_field_ids_delta, index_embeddings))
|
||||
}
|
||||
|
||||
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||
|
||||
REQUEST_THREADS.get_or_init(|| {
|
||||
ThreadPoolNoAbortBuilder::new()
|
||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||
.thread_name(|index| format!("embedding-request-{index}"))
|
||||
.build()
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
@@ -1,85 +0,0 @@
|
||||
use bumparaw_collections::RawMap;
|
||||
use heed::RoTxn;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
||||
use crate::update::new::StdResult;
|
||||
use crate::{FieldsIdsMap, Index, Result, UserError};
|
||||
|
||||
/// Returns the primary key that has already been set for this index or the
|
||||
/// one we will guess by searching for the first key that contains "id" as a substring,
|
||||
/// and whether the primary key changed
|
||||
pub fn retrieve_or_guess_primary_key<'a>(
|
||||
rtxn: &'a RoTxn<'a>,
|
||||
index: &Index,
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
primary_key_from_op: Option<&'a str>,
|
||||
first_document: Option<RawMap<'a, FxBuildHasher>>,
|
||||
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
|
||||
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
|
||||
|
||||
// do we have an existing declared primary key?
|
||||
let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? {
|
||||
// did we request a primary key in the operation?
|
||||
match primary_key_from_op {
|
||||
// we did, and it is different from the DB one
|
||||
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
|
||||
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
|
||||
primary_key_from_db.to_string(),
|
||||
)));
|
||||
}
|
||||
_ => (primary_key_from_db, false),
|
||||
}
|
||||
} else {
|
||||
// no primary key in the DB => let's set one
|
||||
// did we request a primary key in the operation?
|
||||
let primary_key = if let Some(primary_key_from_op) = primary_key_from_op {
|
||||
// set primary key from operation
|
||||
primary_key_from_op
|
||||
} else {
|
||||
// guess primary key
|
||||
let first_document = match first_document {
|
||||
Some(document) => document,
|
||||
// previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found
|
||||
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||
};
|
||||
|
||||
let guesses: Result<Vec<&str>> = first_document
|
||||
.keys()
|
||||
.filter_map(|name| {
|
||||
let Some(_) = new_fields_ids_map.insert(name) else {
|
||||
return Some(Err(UserError::AttributeLimitReached.into()));
|
||||
};
|
||||
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut guesses = guesses?;
|
||||
|
||||
// sort the keys in lexicographical order, so that fields are always in the same order.
|
||||
guesses.sort_unstable();
|
||||
|
||||
match guesses.as_slice() {
|
||||
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||
[name] => {
|
||||
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
||||
*name
|
||||
}
|
||||
multiple => {
|
||||
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
||||
candidates: multiple
|
||||
.iter()
|
||||
.map(|candidate| candidate.to_string())
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
};
|
||||
(primary_key, true)
|
||||
};
|
||||
|
||||
match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) {
|
||||
Ok(primary_key) => Ok(Ok((primary_key, has_changed))),
|
||||
Err(err) => Ok(Err(err)),
|
||||
}
|
||||
}
|
||||
@@ -1,37 +1,61 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{OnceLock, RwLock};
|
||||
use std::thread::{self, Builder};
|
||||
|
||||
use big_s::S;
|
||||
use document_changes::{DocumentChanges, IndexingContext};
|
||||
use bumparaw_collections::RawMap;
|
||||
use document_changes::{extract, DocumentChanges, IndexingContext};
|
||||
pub use document_deletion::DocumentDeletion;
|
||||
pub use document_operation::{DocumentOperation, PayloadStats};
|
||||
use hashbrown::HashMap;
|
||||
use heed::RwTxn;
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::{RoTxn, RwTxn};
|
||||
use itertools::{merge_join_by, EitherOrBoth};
|
||||
pub use partial_dump::PartialDump;
|
||||
use rand::SeedableRng as _;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
use time::OffsetDateTime;
|
||||
pub use update_by_function::UpdateByFunction;
|
||||
use write::{build_vectors, update_index, write_to_db};
|
||||
|
||||
use super::channel::*;
|
||||
use super::extract::*;
|
||||
use super::facet_search_builder::FacetSearchBuilder;
|
||||
use super::merger::FacetFieldIdsDelta;
|
||||
use super::steps::IndexingStep;
|
||||
use super::thread_local::ThreadLocal;
|
||||
use crate::documents::PrimaryKey;
|
||||
use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
|
||||
use super::words_prefix_docids::{
|
||||
compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids,
|
||||
};
|
||||
use super::StdResult;
|
||||
use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY};
|
||||
use crate::facet::FacetType;
|
||||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
|
||||
use crate::progress::Progress;
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::del_add::DelAdd;
|
||||
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
|
||||
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||
use crate::update::new::extract::EmbeddingExtractor;
|
||||
use crate::update::new::merger::merge_and_send_rtree;
|
||||
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||
use crate::update::settings::InnerIndexSettings;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings};
|
||||
use crate::{
|
||||
Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort,
|
||||
ThreadPoolNoAbortBuilder, UserError,
|
||||
};
|
||||
|
||||
mod compute;
|
||||
pub(crate) mod de;
|
||||
pub mod document_changes;
|
||||
mod document_deletion;
|
||||
mod document_operation;
|
||||
mod extract;
|
||||
mod guess_primary_key;
|
||||
mod partial_dump;
|
||||
mod update_by_function;
|
||||
mod write;
|
||||
|
||||
/// This is the main function of this crate.
|
||||
///
|
||||
@@ -85,7 +109,7 @@ where
|
||||
},
|
||||
);
|
||||
|
||||
let (extractor_sender, writer_receiver) = pool
|
||||
let (extractor_sender, mut writer_receiver) = pool
|
||||
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
|
||||
.unwrap();
|
||||
|
||||
@@ -104,10 +128,9 @@ where
|
||||
fields_ids_map_store: &fields_ids_map_store,
|
||||
must_stop_processing,
|
||||
progress,
|
||||
grenad_parameters: &grenad_parameters,
|
||||
};
|
||||
|
||||
let index_embeddings = index.embedding_configs(wtxn)?;
|
||||
let mut index_embeddings = index.embedding_configs(wtxn)?;
|
||||
let mut field_distribution = index.field_distribution(wtxn)?;
|
||||
let mut document_ids = index.documents_ids(wtxn)?;
|
||||
|
||||
@@ -118,28 +141,262 @@ where
|
||||
// prevent moving the field_distribution and document_ids in the inner closure...
|
||||
let field_distribution = &mut field_distribution;
|
||||
let document_ids = &mut document_ids;
|
||||
let extractor_handle =
|
||||
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
||||
pool.install(move || {
|
||||
extract::extract_all(
|
||||
let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
|
||||
pool.install(move || {
|
||||
let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||
let _entered = span.enter();
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
|
||||
// document but we need to create a function that collects and compresses documents.
|
||||
let document_sender = extractor_sender.documents();
|
||||
let document_extractor = DocumentsExtractor::new(document_sender, embedders);
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");
|
||||
let _entered = span.enter();
|
||||
extract(
|
||||
document_changes,
|
||||
&document_extractor,
|
||||
indexing_context,
|
||||
indexer_span,
|
||||
extractor_sender,
|
||||
embedders,
|
||||
&mut extractor_allocs,
|
||||
finished_extraction,
|
||||
field_distribution,
|
||||
index_embeddings,
|
||||
document_ids,
|
||||
)
|
||||
})
|
||||
.unwrap()
|
||||
})?;
|
||||
&datastore,
|
||||
IndexingStep::ExtractingDocuments,
|
||||
)?;
|
||||
}
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents");
|
||||
let _entered = span.enter();
|
||||
for document_extractor_data in datastore {
|
||||
let document_extractor_data = document_extractor_data.0.into_inner();
|
||||
for (field, delta) in document_extractor_data.field_distribution_delta {
|
||||
let current = field_distribution.entry(field).or_default();
|
||||
// adding the delta should never cause a negative result, as we are removing fields that previously existed.
|
||||
*current = current.saturating_add_signed(delta);
|
||||
}
|
||||
document_extractor_data.docids_delta.apply_to(document_ids);
|
||||
}
|
||||
|
||||
field_distribution.retain(|_, v| *v != 0);
|
||||
}
|
||||
|
||||
let facet_field_ids_delta;
|
||||
|
||||
{
|
||||
let caches = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted");
|
||||
let _entered = span.enter();
|
||||
|
||||
FacetedDocidsExtractor::run_extraction(
|
||||
grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
&extractor_sender.field_id_docid_facet_sender(),
|
||||
IndexingStep::ExtractingFacets
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted");
|
||||
let _entered = span.enter();
|
||||
|
||||
facet_field_ids_delta = merge_and_send_facet_docids(
|
||||
caches,
|
||||
FacetDatabases::new(index),
|
||||
index,
|
||||
&rtxn,
|
||||
extractor_sender.facet_docids(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let WordDocidsCaches {
|
||||
word_docids,
|
||||
word_fid_docids,
|
||||
exact_word_docids,
|
||||
word_position_docids,
|
||||
fid_word_count_docids,
|
||||
} = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
WordDocidsExtractors::run_extraction(
|
||||
grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
IndexingStep::ExtractingWords
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_docids,
|
||||
index.word_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_fid_docids,
|
||||
index.word_fid_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordFidDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
exact_word_docids,
|
||||
index.exact_word_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<ExactWordDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
word_position_docids,
|
||||
index.word_position_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordPositionDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids");
|
||||
let _entered = span.enter();
|
||||
merge_and_send_docids(
|
||||
fid_word_count_docids,
|
||||
index.field_id_word_count_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<FidWordCountDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// run the proximity extraction only if the precision is by word
|
||||
// this works only if the settings didn't change during this transaction.
|
||||
let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default();
|
||||
if proximity_precision == ProximityPrecision::ByWord {
|
||||
let caches = {
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
<WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(
|
||||
grenad_parameters,
|
||||
document_changes,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
IndexingStep::ExtractingWordProximity,
|
||||
)?
|
||||
};
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids");
|
||||
let _entered = span.enter();
|
||||
|
||||
merge_and_send_docids(
|
||||
caches,
|
||||
index.word_pair_proximity_docids.remap_types(),
|
||||
index,
|
||||
extractor_sender.docids::<WordPairProximityDocids>(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
'vectors: {
|
||||
if index_embeddings.is_empty() {
|
||||
break 'vectors;
|
||||
}
|
||||
|
||||
let embedding_sender = extractor_sender.embeddings();
|
||||
let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads());
|
||||
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors");
|
||||
let _entered = span.enter();
|
||||
|
||||
extract(
|
||||
document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::ExtractingEmbeddings,
|
||||
)?;
|
||||
}
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors");
|
||||
let _entered = span.enter();
|
||||
|
||||
for config in &mut index_embeddings {
|
||||
'data: for data in datastore.iter_mut() {
|
||||
let data = &mut data.get_mut().0;
|
||||
let Some(deladd) = data.remove(&config.name) else { continue 'data; };
|
||||
deladd.apply_to(&mut config.user_provided);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
'geo: {
|
||||
let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
|
||||
break 'geo;
|
||||
};
|
||||
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
|
||||
let _entered = span.enter();
|
||||
|
||||
extract(
|
||||
document_changes,
|
||||
&extractor,
|
||||
indexing_context,
|
||||
&mut extractor_allocs,
|
||||
&datastore,
|
||||
IndexingStep::WritingGeoPoints
|
||||
)?;
|
||||
}
|
||||
|
||||
merge_and_send_rtree(
|
||||
datastore,
|
||||
&rtxn,
|
||||
index,
|
||||
extractor_sender.geo(),
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
indexing_context.progress.update_progress(IndexingStep::WritingToDatabase);
|
||||
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
Result::Ok((facet_field_ids_delta, index_embeddings))
|
||||
}).unwrap()
|
||||
})?;
|
||||
|
||||
let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map);
|
||||
|
||||
let vector_arroy = index.vector_arroy;
|
||||
let indexer_span = tracing::Span::current();
|
||||
let arroy_writers: Result<HashMap<_, _>> = embedders
|
||||
.inner_as_ref()
|
||||
.iter()
|
||||
@@ -161,25 +418,114 @@ where
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Used by by the ArroySetVector to copy the embedding into an
|
||||
// aligned memory area, required by arroy to accept a new vector.
|
||||
let mut aligned_embedding = Vec::new();
|
||||
let mut arroy_writers = arroy_writers?;
|
||||
|
||||
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||
let _entered = span.enter();
|
||||
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||
let mut _entered_post_merge = None;
|
||||
|
||||
while let Some(action) = writer_receiver.recv_action() {
|
||||
if _entered_post_merge.is_none()
|
||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||
{
|
||||
_entered_post_merge = Some(span.enter());
|
||||
}
|
||||
|
||||
match action {
|
||||
ReceiverAction::WakeUp => (),
|
||||
ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
|
||||
let database_name = database.database_name();
|
||||
let database = database.database(index);
|
||||
if let Err(error) = database.put(wtxn, &key, &value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: bstr::BString::from(&key[..]),
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
ReceiverAction::LargeVectors(large_vectors) => {
|
||||
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
for embedding in large_vectors.read_embeddings(*dimensions) {
|
||||
embeddings.push(embedding.to_vec()).unwrap();
|
||||
}
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Every time the is a message in the channel we search
|
||||
// for new entries in the BBQueue buffers.
|
||||
write_from_bbqueue(
|
||||
&mut writer_receiver,
|
||||
index,
|
||||
wtxn,
|
||||
&arroy_writers,
|
||||
&mut aligned_embedding,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Once the extractor/writer channel is closed
|
||||
// we must process the remaining BBQueue messages.
|
||||
write_from_bbqueue(
|
||||
&mut writer_receiver,
|
||||
index,
|
||||
wtxn,
|
||||
&arroy_writers,
|
||||
&mut aligned_embedding,
|
||||
)?;
|
||||
}
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
|
||||
|
||||
let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
||||
'vectors: {
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
||||
let _entered = span.enter();
|
||||
|
||||
build_vectors(
|
||||
index,
|
||||
wtxn,
|
||||
index_embeddings,
|
||||
&mut arroy_writers,
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
if index_embeddings.is_empty() {
|
||||
break 'vectors;
|
||||
}
|
||||
|
||||
compute::postprocess(indexing_context, wtxn, global_fields_ids_map, facet_field_ids_delta)?;
|
||||
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.build_and_quantize(
|
||||
wtxn,
|
||||
&mut rng,
|
||||
dimensions,
|
||||
false,
|
||||
&indexing_context.must_stop_processing,
|
||||
)?;
|
||||
}
|
||||
|
||||
index.put_embedding_configs(wtxn, index_embeddings)?;
|
||||
}
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
|
||||
if index.facet_search(wtxn)? {
|
||||
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
|
||||
}
|
||||
|
||||
compute_facet_level_database(index, wtxn, facet_field_ids_delta)?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
|
||||
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
|
||||
compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?;
|
||||
}
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::Finalizing);
|
||||
|
||||
@@ -190,15 +536,363 @@ where
|
||||
drop(fields_ids_map_store);
|
||||
|
||||
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
|
||||
update_index(
|
||||
index,
|
||||
wtxn,
|
||||
new_fields_ids_map,
|
||||
new_primary_key,
|
||||
embedders,
|
||||
field_distribution,
|
||||
document_ids,
|
||||
)?;
|
||||
for (fid, name, metadata) in new_fields_ids_map.iter() {
|
||||
tracing::debug!("{fid}:{name},{metadata:?}");
|
||||
}
|
||||
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
||||
|
||||
if let Some(new_primary_key) = new_primary_key {
|
||||
index.put_primary_key(wtxn, new_primary_key.name())?;
|
||||
}
|
||||
|
||||
// used to update the localized and weighted maps while sharing the update code with the settings pipeline.
|
||||
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?;
|
||||
inner_index_settings.recompute_facets(wtxn, index)?;
|
||||
inner_index_settings.recompute_searchables(wtxn, index)?;
|
||||
index.put_field_distribution(wtxn, &field_distribution)?;
|
||||
index.put_documents_ids(wtxn, &document_ids)?;
|
||||
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A function dedicated to manage all the available BBQueue frames.
|
||||
///
|
||||
/// It reads all the available frames, do the corresponding database operations
|
||||
/// and stops when no frame are available.
|
||||
fn write_from_bbqueue(
|
||||
writer_receiver: &mut WriterBbqueueReceiver<'_>,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
|
||||
aligned_embedding: &mut Vec<f32>,
|
||||
) -> crate::Result<()> {
|
||||
while let Some(frame_with_header) = writer_receiver.recv_frame() {
|
||||
match frame_with_header.header() {
|
||||
EntryHeader::DbOperation(operation) => {
|
||||
let database_name = operation.database.database_name();
|
||||
let database = operation.database.database(index);
|
||||
let frame = frame_with_header.frame();
|
||||
match operation.key_value(frame) {
|
||||
(key, Some(value)) => {
|
||||
if let Err(error) = database.put(wtxn, key, value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
(key, None) => match database.delete(wtxn, key) {
|
||||
Ok(false) => {
|
||||
unreachable!("We tried to delete an unknown key: {key:?}")
|
||||
}
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
return Err(Error::InternalError(InternalError::StoreDeletion {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
||||
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.del_items(wtxn, dimensions, docid)?;
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroySetVectors(asvs) => {
|
||||
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
||||
let frame = frame_with_header.frame();
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
|
||||
embeddings.append(all_embeddings.to_vec()).unwrap();
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
fn compute_prefix_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
prefix_delta: PrefixDelta,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<()> {
|
||||
let PrefixDelta { modified, deleted } = prefix_delta;
|
||||
// Compute word prefix docids
|
||||
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute exact word prefix docids
|
||||
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute word prefix fid docids
|
||||
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
|
||||
// Compute word prefix position docids
|
||||
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
|
||||
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let words_fst = index.words_fst(&rtxn)?;
|
||||
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
|
||||
let prefix_settings = index.prefix_settings(&rtxn)?;
|
||||
word_fst_builder.with_prefix_settings(prefix_settings);
|
||||
|
||||
let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::<Bytes>();
|
||||
let current_words = index.word_docids.iter(wtxn)?.remap_data_type::<Bytes>();
|
||||
for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) {
|
||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||
}) {
|
||||
match eob {
|
||||
EitherOrBoth::Both(lhs, rhs) => {
|
||||
let (word, lhs_bytes) = lhs?;
|
||||
let (_, rhs_bytes) = rhs?;
|
||||
if lhs_bytes != rhs_bytes {
|
||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Left(result) => {
|
||||
let (word, _) = result?;
|
||||
word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?;
|
||||
}
|
||||
EitherOrBoth::Right(result) => {
|
||||
let (word, _) = result?;
|
||||
word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?;
|
||||
index.main.remap_types::<Str, Bytes>().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?;
|
||||
if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data {
|
||||
index.main.remap_types::<Str, Bytes>().put(
|
||||
wtxn,
|
||||
WORDS_PREFIXES_FST_KEY,
|
||||
&prefixes_fst_mmap,
|
||||
)?;
|
||||
Ok(Some(prefix_delta))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")]
|
||||
fn compute_facet_search_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
global_fields_ids_map: GlobalFieldsIdsMap,
|
||||
) -> Result<()> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?;
|
||||
let mut facet_search_builder = FacetSearchBuilder::new(
|
||||
global_fields_ids_map,
|
||||
localized_attributes_rules.unwrap_or_default(),
|
||||
);
|
||||
|
||||
let previous_facet_id_string_docids = index
|
||||
.facet_id_string_docids
|
||||
.iter(&rtxn)?
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||
let current_facet_id_string_docids = index
|
||||
.facet_id_string_docids
|
||||
.iter(wtxn)?
|
||||
.remap_data_type::<DecodeIgnore>()
|
||||
.filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0));
|
||||
for eob in merge_join_by(
|
||||
previous_facet_id_string_docids,
|
||||
current_facet_id_string_docids,
|
||||
|lhs, rhs| match (lhs, rhs) {
|
||||
(Ok((l, _)), Ok((r, _))) => l.cmp(r),
|
||||
(Err(_), _) | (_, Err(_)) => Ordering::Equal,
|
||||
},
|
||||
) {
|
||||
match eob {
|
||||
EitherOrBoth::Both(lhs, rhs) => {
|
||||
let (_, _) = lhs?;
|
||||
let (_, _) = rhs?;
|
||||
}
|
||||
EitherOrBoth::Left(result) => {
|
||||
let (key, _) = result?;
|
||||
facet_search_builder.register_from_key(DelAdd::Deletion, key)?;
|
||||
}
|
||||
EitherOrBoth::Right(result) => {
|
||||
let (key, _) = result?;
|
||||
facet_search_builder.register_from_key(DelAdd::Addition, key)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
facet_search_builder.merge_and_write(index, wtxn, &rtxn)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")]
|
||||
fn compute_facet_level_database(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
mut facet_field_ids_delta: FacetFieldIdsDelta,
|
||||
) -> Result<()> {
|
||||
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
|
||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
|
||||
let _entered = span.enter();
|
||||
match delta {
|
||||
super::merger::FacetFieldIdDelta::Bulk => {
|
||||
tracing::debug!(%fid, "bulk string facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||
.execute(wtxn)?
|
||||
}
|
||||
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
|
||||
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
|
||||
FacetsUpdateIncremental::new(
|
||||
index,
|
||||
FacetType::String,
|
||||
fid,
|
||||
delta_data,
|
||||
FACET_GROUP_SIZE,
|
||||
FACET_MIN_LEVEL_SIZE,
|
||||
FACET_MAX_GROUP_SIZE,
|
||||
)
|
||||
.execute(wtxn)?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
|
||||
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
|
||||
let _entered = span.enter();
|
||||
match delta {
|
||||
super::merger::FacetFieldIdDelta::Bulk => {
|
||||
tracing::debug!(%fid, "bulk number facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
|
||||
.execute(wtxn)?
|
||||
}
|
||||
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
|
||||
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
|
||||
FacetsUpdateIncremental::new(
|
||||
index,
|
||||
FacetType::Number,
|
||||
fid,
|
||||
delta_data,
|
||||
FACET_GROUP_SIZE,
|
||||
FACET_MIN_LEVEL_SIZE,
|
||||
FACET_MAX_GROUP_SIZE,
|
||||
)
|
||||
.execute(wtxn)?
|
||||
}
|
||||
}
|
||||
debug_assert!(crate::update::facet::sanity_checks(
|
||||
index,
|
||||
wtxn,
|
||||
fid,
|
||||
FacetType::Number,
|
||||
FACET_GROUP_SIZE as usize,
|
||||
FACET_MIN_LEVEL_SIZE as usize,
|
||||
FACET_MAX_GROUP_SIZE as usize,
|
||||
)
|
||||
.is_ok());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the primary key that has already been set for this index or the
|
||||
/// one we will guess by searching for the first key that contains "id" as a substring,
|
||||
/// and whether the primary key changed
|
||||
/// TODO move this elsewhere
|
||||
pub fn retrieve_or_guess_primary_key<'a>(
|
||||
rtxn: &'a RoTxn<'a>,
|
||||
index: &Index,
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
primary_key_from_op: Option<&'a str>,
|
||||
first_document: Option<RawMap<'a, FxBuildHasher>>,
|
||||
) -> Result<StdResult<(PrimaryKey<'a>, bool), UserError>> {
|
||||
// make sure that we have a declared primary key, either fetching it from the index or attempting to guess it.
|
||||
|
||||
// do we have an existing declared primary key?
|
||||
let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? {
|
||||
// did we request a primary key in the operation?
|
||||
match primary_key_from_op {
|
||||
// we did, and it is different from the DB one
|
||||
Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => {
|
||||
return Ok(Err(UserError::PrimaryKeyCannotBeChanged(
|
||||
primary_key_from_db.to_string(),
|
||||
)));
|
||||
}
|
||||
_ => (primary_key_from_db, false),
|
||||
}
|
||||
} else {
|
||||
// no primary key in the DB => let's set one
|
||||
// did we request a primary key in the operation?
|
||||
let primary_key = if let Some(primary_key_from_op) = primary_key_from_op {
|
||||
// set primary key from operation
|
||||
primary_key_from_op
|
||||
} else {
|
||||
// guess primary key
|
||||
let first_document = match first_document {
|
||||
Some(document) => document,
|
||||
// previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found
|
||||
None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||
};
|
||||
|
||||
let guesses: Result<Vec<&str>> = first_document
|
||||
.keys()
|
||||
.filter_map(|name| {
|
||||
let Some(_) = new_fields_ids_map.insert(name) else {
|
||||
return Some(Err(UserError::AttributeLimitReached.into()));
|
||||
};
|
||||
name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let mut guesses = guesses?;
|
||||
|
||||
// sort the keys in lexicographical order, so that fields are always in the same order.
|
||||
guesses.sort_unstable();
|
||||
|
||||
match guesses.as_slice() {
|
||||
[] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)),
|
||||
[name] => {
|
||||
tracing::info!("Primary key was not specified in index. Inferred to '{name}'");
|
||||
*name
|
||||
}
|
||||
multiple => {
|
||||
return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound {
|
||||
candidates: multiple
|
||||
.iter()
|
||||
.map(|candidate| candidate.to_string())
|
||||
.collect(),
|
||||
}))
|
||||
}
|
||||
}
|
||||
};
|
||||
(primary_key, true)
|
||||
};
|
||||
|
||||
match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) {
|
||||
Ok(primary_key) => Ok(Ok((primary_key, has_changed))),
|
||||
Err(err) => Ok(Err(err)),
|
||||
}
|
||||
}
|
||||
|
||||
fn request_threads() -> &'static ThreadPoolNoAbort {
|
||||
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
|
||||
|
||||
REQUEST_THREADS.get_or_init(|| {
|
||||
ThreadPoolNoAbortBuilder::new()
|
||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||
.thread_name(|index| format!("embedding-request-{index}"))
|
||||
.build()
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,189 +0,0 @@
|
||||
use std::sync::atomic::AtomicBool;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use heed::RwTxn;
|
||||
use rand::SeedableRng as _;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::super::channel::*;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||
use crate::index::IndexEmbeddingConfig;
|
||||
use crate::update::settings::InnerIndexSettings;
|
||||
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
|
||||
use crate::{Error, Index, InternalError, Result};
|
||||
|
||||
pub(super) fn write_to_db(
|
||||
mut writer_receiver: WriterBbqueueReceiver<'_>,
|
||||
finished_extraction: &AtomicBool,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
arroy_writers: &HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||
) -> Result<()> {
|
||||
// Used by by the ArroySetVector to copy the embedding into an
|
||||
// aligned memory area, required by arroy to accept a new vector.
|
||||
let mut aligned_embedding = Vec::new();
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||
let _entered = span.enter();
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||
let mut _entered_post_merge = None;
|
||||
while let Some(action) = writer_receiver.recv_action() {
|
||||
if _entered_post_merge.is_none()
|
||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||
{
|
||||
_entered_post_merge = Some(span.enter());
|
||||
}
|
||||
|
||||
match action {
|
||||
ReceiverAction::WakeUp => (),
|
||||
ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => {
|
||||
let database_name = database.database_name();
|
||||
let database = database.database(index);
|
||||
if let Err(error) = database.put(wtxn, &key, &value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: bstr::BString::from(&key[..]),
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
ReceiverAction::LargeVectors(large_vectors) => {
|
||||
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
for embedding in large_vectors.read_embeddings(*dimensions) {
|
||||
embeddings.push(embedding.to_vec()).unwrap();
|
||||
}
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Every time the is a message in the channel we search
|
||||
// for new entries in the BBQueue buffers.
|
||||
write_from_bbqueue(
|
||||
&mut writer_receiver,
|
||||
index,
|
||||
wtxn,
|
||||
arroy_writers,
|
||||
&mut aligned_embedding,
|
||||
)?;
|
||||
}
|
||||
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::vectors")]
|
||||
pub(super) fn build_vectors<MSP>(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
index_embeddings: Vec<IndexEmbeddingConfig>,
|
||||
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
|
||||
must_stop_processing: &MSP,
|
||||
) -> Result<()>
|
||||
where
|
||||
MSP: Fn() -> bool + Sync + Send,
|
||||
{
|
||||
if index_embeddings.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
|
||||
for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.build_and_quantize(wtxn, &mut rng, dimensions, false, must_stop_processing)?;
|
||||
}
|
||||
|
||||
index.put_embedding_configs(wtxn, index_embeddings)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) fn update_index(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
new_fields_ids_map: FieldIdMapWithMetadata,
|
||||
new_primary_key: Option<PrimaryKey<'_>>,
|
||||
embedders: EmbeddingConfigs,
|
||||
field_distribution: std::collections::BTreeMap<String, u64>,
|
||||
document_ids: roaring::RoaringBitmap,
|
||||
) -> Result<()> {
|
||||
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
|
||||
if let Some(new_primary_key) = new_primary_key {
|
||||
index.put_primary_key(wtxn, new_primary_key.name())?;
|
||||
}
|
||||
let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?;
|
||||
inner_index_settings.recompute_facets(wtxn, index)?;
|
||||
inner_index_settings.recompute_searchables(wtxn, index)?;
|
||||
index.put_field_distribution(wtxn, &field_distribution)?;
|
||||
index.put_documents_ids(wtxn, &document_ids)?;
|
||||
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A function dedicated to manage all the available BBQueue frames.
|
||||
///
|
||||
/// It reads all the available frames, do the corresponding database operations
|
||||
/// and stops when no frame are available.
|
||||
pub fn write_from_bbqueue(
|
||||
writer_receiver: &mut WriterBbqueueReceiver<'_>,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
|
||||
aligned_embedding: &mut Vec<f32>,
|
||||
) -> crate::Result<()> {
|
||||
while let Some(frame_with_header) = writer_receiver.recv_frame() {
|
||||
match frame_with_header.header() {
|
||||
EntryHeader::DbOperation(operation) => {
|
||||
let database_name = operation.database.database_name();
|
||||
let database = operation.database.database(index);
|
||||
let frame = frame_with_header.frame();
|
||||
match operation.key_value(frame) {
|
||||
(key, Some(value)) => {
|
||||
if let Err(error) = database.put(wtxn, key, value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
(key, None) => match database.delete(wtxn, key) {
|
||||
Ok(false) => {
|
||||
unreachable!("We tried to delete an unknown key: {key:?}")
|
||||
}
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
return Err(Error::InternalError(InternalError::StoreDeletion {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
||||
for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.del_items(wtxn, dimensions, docid)?;
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroySetVectors(asvs) => {
|
||||
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
||||
let frame = frame_with_header.frame();
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding);
|
||||
embeddings.append(all_embeddings.to_vec()).unwrap();
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::cell::RefCell;
|
||||
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use hashbrown::HashMap;
|
||||
use heed::types::Bytes;
|
||||
use heed::{Database, RoTxn};
|
||||
use memmap2::Mmap;
|
||||
@@ -12,6 +12,7 @@ use super::extract::{
|
||||
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
|
||||
FacetKind, GeoExtractorData,
|
||||
};
|
||||
use crate::update::facet::new_incremental::FacetFieldIdChange;
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
|
||||
@@ -100,12 +101,18 @@ pub fn merge_and_send_facet_docids<'extractor>(
|
||||
mut caches: Vec<BalancedCaches<'extractor>>,
|
||||
database: FacetDatabases,
|
||||
index: &Index,
|
||||
rtxn: &RoTxn,
|
||||
docids_sender: FacetDocidsSender,
|
||||
) -> Result<FacetFieldIdsDelta> {
|
||||
let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
|
||||
let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
|
||||
let max_string_count = max_string_count.clamp(1000, 100_000);
|
||||
let max_number_count = max_number_count.clamp(1000, 100_000);
|
||||
transpose_and_freeze_caches(&mut caches)?
|
||||
.into_par_iter()
|
||||
.map(|frozen| {
|
||||
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
|
||||
let mut facet_field_ids_delta =
|
||||
FacetFieldIdsDelta::new(max_string_count, max_number_count);
|
||||
let rtxn = index.read_txn()?;
|
||||
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
|
||||
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
|
||||
@@ -126,7 +133,10 @@ pub fn merge_and_send_facet_docids<'extractor>(
|
||||
|
||||
Ok(facet_field_ids_delta)
|
||||
})
|
||||
.reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?)))
|
||||
.reduce(
|
||||
|| Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
|
||||
|lhs, rhs| Ok(lhs?.merge(rhs?)),
|
||||
)
|
||||
}
|
||||
|
||||
pub struct FacetDatabases<'a> {
|
||||
@@ -162,115 +172,125 @@ pub enum FacetFieldIdDelta {
|
||||
}
|
||||
|
||||
impl FacetFieldIdDelta {
|
||||
fn push(&mut self, facet_value: &[u8], operation: FacetFieldIdOperation, db_size: usize) {
|
||||
fn push(&mut self, facet_value: &[u8], max_count: usize) {
|
||||
*self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
|
||||
FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
|
||||
FacetFieldIdDelta::Incremental(mut v) => {
|
||||
if v.len() >= (db_size / 500) {
|
||||
if v.len() >= max_count {
|
||||
FacetFieldIdDelta::Bulk
|
||||
} else {
|
||||
v.push(FacetFieldIdChange { facet_value: facet_value.into(), operation });
|
||||
v.push(FacetFieldIdChange { facet_value: facet_value.into() });
|
||||
FacetFieldIdDelta::Incremental(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
|
||||
let Some(rhs) = rhs else {
|
||||
return;
|
||||
};
|
||||
*self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
|
||||
(FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
|
||||
(
|
||||
FacetFieldIdDelta::Incremental(mut left),
|
||||
FacetFieldIdDelta::Incremental(mut right),
|
||||
) => {
|
||||
if left.len() + right.len() >= max_count {
|
||||
FacetFieldIdDelta::Bulk
|
||||
} else {
|
||||
left.append(&mut right);
|
||||
FacetFieldIdDelta::Incremental(left)
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct FacetFieldIdChange {
|
||||
facet_value: Box<[u8]>,
|
||||
operation: FacetFieldIdOperation,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum FacetFieldIdOperation {
|
||||
/// The docids have been modified for an existing facet value
|
||||
///
|
||||
/// The modification must be propagated to upper levels, without changing the structure of the tree
|
||||
InPlace,
|
||||
/// A new value has been inserted
|
||||
///
|
||||
/// The modification must be propagated to upper levels, splitting nodes and adding new levels as necessary.
|
||||
Insert,
|
||||
/// An existing value has been deleted
|
||||
///
|
||||
/// The modification must be propagated to upper levels, merging nodes and removing levels as necessary.
|
||||
Remove,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct FacetFieldIdsDelta {
|
||||
/// The field ids that have been modified
|
||||
modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
|
||||
modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
|
||||
db_size: usize,
|
||||
max_string_count: usize,
|
||||
max_number_count: usize,
|
||||
}
|
||||
|
||||
impl FacetFieldIdsDelta {
|
||||
fn register_facet_string_id(
|
||||
&mut self,
|
||||
field_id: FieldId,
|
||||
facet_value: &[u8],
|
||||
operation: FacetFieldIdOperation,
|
||||
) {
|
||||
pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
|
||||
Self {
|
||||
max_string_count,
|
||||
max_number_count,
|
||||
modified_facet_string_ids: Default::default(),
|
||||
modified_facet_number_ids: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
|
||||
self.modified_facet_string_ids
|
||||
.entry(field_id)
|
||||
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
|
||||
.push(facet_value, operation, self.db_size);
|
||||
.push(facet_value, self.max_string_count);
|
||||
}
|
||||
|
||||
fn register_facet_number_id(
|
||||
&mut self,
|
||||
field_id: FieldId,
|
||||
facet_value: &[u8],
|
||||
operation: FacetFieldIdOperation,
|
||||
) {
|
||||
fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
|
||||
self.modified_facet_number_ids
|
||||
.entry(field_id)
|
||||
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
|
||||
.push(facet_value, operation, self.db_size);
|
||||
.push(facet_value, self.max_number_count);
|
||||
}
|
||||
|
||||
fn register_from_key(&mut self, key: &[u8], operation: FacetFieldIdOperation) {
|
||||
fn register_from_key(&mut self, key: &[u8]) {
|
||||
let (facet_kind, field_id, facet_value) = self.extract_key_data(key);
|
||||
match facet_kind {
|
||||
FacetKind::Number => self.register_facet_number_id(field_id, facet_value, operation),
|
||||
FacetKind::String => self.register_facet_string_id(field_id, facet_value, operation),
|
||||
match (facet_kind, facet_value) {
|
||||
(FacetKind::Number, Some(facet_value)) => {
|
||||
self.register_facet_number_id(field_id, facet_value)
|
||||
}
|
||||
(FacetKind::String, Some(facet_value)) => {
|
||||
self.register_facet_string_id(field_id, facet_value)
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId, &[u8]) {
|
||||
fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) {
|
||||
let facet_kind = FacetKind::from(key[0]);
|
||||
let field_id = FieldId::from_be_bytes([key[1], key[2]]);
|
||||
let facet_value = &key[2..];
|
||||
let facet_value = if key.len() >= 4 {
|
||||
// level is also stored in the key at [3] (always 0)
|
||||
Some(&key[4..])
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
(facet_kind, field_id, facet_value)
|
||||
}
|
||||
|
||||
pub fn modified_facet_string_ids(&self) -> Option<Vec<FieldId>> {
|
||||
if self.modified_facet_string_ids.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.modified_facet_string_ids.iter().copied().collect())
|
||||
}
|
||||
pub fn consume_facet_string_delta(
|
||||
&mut self,
|
||||
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
||||
None.into_iter()
|
||||
// self.modified_facet_string_ids.drain()
|
||||
}
|
||||
|
||||
pub fn modified_facet_number_ids(&self) -> Option<Vec<FieldId>> {
|
||||
if self.modified_facet_number_ids.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.modified_facet_number_ids.iter().copied().collect())
|
||||
}
|
||||
pub fn consume_facet_number_delta(
|
||||
&mut self,
|
||||
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
|
||||
self.modified_facet_number_ids.drain()
|
||||
}
|
||||
|
||||
pub fn merge(mut self, rhs: Self) -> Self {
|
||||
let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs;
|
||||
modified_facet_number_ids.into_iter().for_each(|fid| {
|
||||
self.modified_facet_number_ids.insert(fid);
|
||||
// rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused
|
||||
let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
|
||||
modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
|
||||
let old_delta = self.modified_facet_number_ids.remove(&fid);
|
||||
delta.merge(old_delta, self.max_number_count);
|
||||
self.modified_facet_number_ids.insert(fid, delta);
|
||||
});
|
||||
modified_facet_string_ids.into_iter().for_each(|fid| {
|
||||
self.modified_facet_string_ids.insert(fid);
|
||||
modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
|
||||
let old_delta = self.modified_facet_string_ids.remove(&fid);
|
||||
delta.merge(old_delta, self.max_string_count);
|
||||
self.modified_facet_string_ids.insert(fid, delta);
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ impl WordPrefixDocids {
|
||||
fn new(
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> WordPrefixDocids {
|
||||
WordPrefixDocids {
|
||||
database,
|
||||
@@ -161,7 +161,7 @@ impl WordPrefixIntegerDocids {
|
||||
fn new(
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> WordPrefixIntegerDocids {
|
||||
WordPrefixIntegerDocids {
|
||||
database,
|
||||
@@ -311,7 +311,7 @@ pub fn compute_word_prefix_docids(
|
||||
index: &Index,
|
||||
prefix_to_compute: &BTreeSet<Prefix>,
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixDocids::new(
|
||||
index.word_docids.remap_key_type(),
|
||||
@@ -327,7 +327,7 @@ pub fn compute_exact_word_prefix_docids(
|
||||
index: &Index,
|
||||
prefix_to_compute: &BTreeSet<Prefix>,
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixDocids::new(
|
||||
index.exact_word_docids.remap_key_type(),
|
||||
@@ -343,7 +343,7 @@ pub fn compute_word_prefix_fid_docids(
|
||||
index: &Index,
|
||||
prefix_to_compute: &BTreeSet<Prefix>,
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixIntegerDocids::new(
|
||||
index.word_fid_docids.remap_key_type(),
|
||||
@@ -359,7 +359,7 @@ pub fn compute_word_prefix_position_docids(
|
||||
index: &Index,
|
||||
prefix_to_compute: &BTreeSet<Prefix>,
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
grenad_parameters: GrenadParameters,
|
||||
) -> Result<()> {
|
||||
WordPrefixIntegerDocids::new(
|
||||
index.word_position_docids.remap_key_type(),
|
||||
|
||||
Reference in New Issue
Block a user