Compare commits

...

24 Commits

Author SHA1 Message Date
50c6854964 Merge #4798
4798: Update version for the next release (v1.8.4) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2024-07-15 14:09:11 +00:00
f0f02e6412 Merge #4796
4796: Generate vectors in dumps r=dureuill a=dureuill

# Pull Request

## What does this PR do?
1. Add an Index::embeddings method to compute the embeddings of a document
2. Write generated vectors in dumps
3. Remove generated vectors when importing dumps
4. Cherry pick the `ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION` workaround so that the older CI can still build

## Manual Tests (TODO)

- [ ] Import a dump from a v1.8.3 into a v1.8.4 successfully
- [x] Import a dump from a v1.8.4 into a v1.8.4 successfully
- [x] Import a dump from a v1.8.4 into a v1.9.0 successfully
  - [x] generated vectors are not regenerated
  - [x] user provided vectors are still available
  - [x] generated vectors still have the correct value
  - [x] updating a document with generated vectors attempts to regenerate

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2024-07-15 13:26:30 +00:00
43bf3ff4e0 Update version for the next release (v1.8.4) in Cargo.toml 2024-07-15 10:24:00 +00:00
8fe6d31e01 CI: Add ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION workaround to keep using Ubuntu 18.04 2024-07-15 12:04:14 +02:00
9ec209bbf4 When importing dumps, remove regenerate: true vectors items 2024-07-15 11:57:11 +02:00
9375b7bba5 Inject generated vectors in dumps 2024-07-15 11:56:39 +02:00
363a5cc590 Retrieve function from v1.9 to get embeddings in documents 2024-07-15 11:56:18 +02:00
7d69953267 Merge #4709
4709: Update version for the next release (v1.8.3) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2024-06-19 15:25:38 +00:00
7bd1b7ac43 Merge #4707
4707: Only spawn thread pool once r=irevoire a=dureuill

# Pull Request

## Related issue
Fixes #4692 

## What does this PR do?
- There was a rayon thread pool of 40 threads that would be spawned multiple times per indexing operation.
- Perhaps due to the sheer number of spawned threads, or to a leak in rayon thread pools, the system was unable to reclaim all the spawned threads at a sufficient rate.
- As a result, the stack for the threads would accumulate and consume virtual memory, and eventually physical memory too.
- Fortunately, the pool can actually be created once and then always reused. This PR performs this change.


Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2024-06-19 14:51:06 +00:00
1ff860e0a8 Fixes for Rust v1.79 2024-06-19 15:52:00 +02:00
5d2b172e79 Update version for the next release (v1.8.3) in Cargo.toml 2024-06-19 13:32:04 +00:00
e64d0a206e Don't bind request_threads() to a local variable 2024-06-19 15:23:11 +02:00
6254c7cee1 Only spawn the pool once 2024-06-19 15:17:46 +02:00
6c6c4732a1 Merge #4681
4681: Fix concurrency issue r=irevoire a=dureuill

# Pull Request

## Related issue
Fixes #4654 

## What does this PR do?
- Asynchronously drop permits


Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2024-06-10 09:36:08 +00:00
3976fe660e Merge #4688
4688: Update version for the next release (v1.8.2) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2024-06-10 08:28:34 +00:00
50f8218a5d Asynchronously drop permits 2024-06-10 10:19:57 +02:00
19585f1a4f Update version for the next release (v1.8.2) in Cargo.toml 2024-06-10 07:59:36 +00:00
ba75d23bfe Merge #4648
4648: Update version for the next release (v1.8.1) in Cargo.toml r=ManyTheFish a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: ManyTheFish <ManyTheFish@users.noreply.github.com>
2024-05-21 16:38:36 +00:00
7fbb3bf8e8 Update version for the next release (v1.8.1) in Cargo.toml 2024-05-21 15:13:03 +00:00
9066a446a3 Merge #4642
4642: Index the _geo fields when changing the setting while there is already documents in the DB r=ManyTheFish a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/4640
Fixes https://github.com/meilisearch/meilisearch/issues/4628

## What does this PR do?
- Add an integration test that first indexes the document and then changes the settings
- Fix `extract_geo_point` by detecting if the `_geo` field has been faceted in this setting change and index all documents

Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
2024-05-21 13:16:11 +00:00
f762307838 Fix clippy 2024-05-21 13:44:20 +02:00
3e94a90722 Fixes 2024-05-21 13:39:46 +02:00
fc7e817221 Index geo points based on the settings differences 2024-05-20 12:27:26 +02:00
0f78703b85 add a test reproducing the bug 2024-05-20 10:58:08 +02:00
24 changed files with 321 additions and 130 deletions

View File

@ -1,4 +1,6 @@
name: Look for flaky tests name: Look for flaky tests
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
on: on:
workflow_dispatch: workflow_dispatch:
schedule: schedule:

View File

@ -1,5 +1,6 @@
name: Run the indexing fuzzer name: Run the indexing fuzzer
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
on: on:
push: push:
branches: branches:

View File

@ -15,6 +15,8 @@ jobs:
debian: debian:
name: Publish debian packagge name: Publish debian packagge
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: check-version needs: check-version
container: container:

View File

@ -35,6 +35,8 @@ jobs:
publish-linux: publish-linux:
name: Publish binary for Linux name: Publish binary for Linux
runs-on: ubuntu-latest runs-on: ubuntu-latest
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
needs: check-version needs: check-version
container: container:
# Use ubuntu-18.04 to compile with glibc 2.27 # Use ubuntu-18.04 to compile with glibc 2.27
@ -132,6 +134,8 @@ jobs:
name: Publish binary for aarch64 name: Publish binary for aarch64
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: check-version needs: check-version
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
container: container:
# Use ubuntu-18.04 to compile with glibc 2.27 # Use ubuntu-18.04 to compile with glibc 2.27
image: ubuntu:18.04 image: ubuntu:18.04

View File

@ -21,6 +21,8 @@ jobs:
test-linux: test-linux:
name: Tests on ubuntu-18.04 name: Tests on ubuntu-18.04
runs-on: ubuntu-latest runs-on: ubuntu-latest
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
container: container:
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations # Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations
image: ubuntu:18.04 image: ubuntu:18.04
@ -77,6 +79,8 @@ jobs:
test-all-features: test-all-features:
name: Tests almost all features name: Tests almost all features
runs-on: ubuntu-latest runs-on: ubuntu-latest
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
container: container:
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations # Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations
image: ubuntu:18.04 image: ubuntu:18.04
@ -100,6 +104,8 @@ jobs:
test-disabled-tokenization: test-disabled-tokenization:
name: Test disabled tokenization name: Test disabled tokenization
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
image: ubuntu:18.04 image: ubuntu:18.04
@ -127,6 +133,8 @@ jobs:
# We run tests in debug also, to make sure that the debug_assertions are hit # We run tests in debug also, to make sure that the debug_assertions are hit
test-debug: test-debug:
name: Run tests in debug name: Run tests in debug
env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations # Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations

34
Cargo.lock generated
View File

@ -494,7 +494,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]] [[package]]
name = "benchmarks" name = "benchmarks"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -639,7 +639,7 @@ dependencies = [
[[package]] [[package]]
name = "build-info" name = "build-info"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"time", "time",
@ -1539,7 +1539,7 @@ dependencies = [
[[package]] [[package]]
name = "dump" name = "dump"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"big_s", "big_s",
@ -1787,7 +1787,7 @@ dependencies = [
[[package]] [[package]]
name = "file-store" name = "file-store"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"faux", "faux",
"tempfile", "tempfile",
@ -1810,7 +1810,7 @@ dependencies = [
[[package]] [[package]]
name = "filter-parser" name = "filter-parser"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"insta", "insta",
"nom", "nom",
@ -1830,7 +1830,7 @@ dependencies = [
[[package]] [[package]]
name = "flatten-serde-json" name = "flatten-serde-json"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"criterion", "criterion",
"serde_json", "serde_json",
@ -1948,7 +1948,7 @@ dependencies = [
[[package]] [[package]]
name = "fuzzers" name = "fuzzers"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"arbitrary", "arbitrary",
"clap", "clap",
@ -2442,7 +2442,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]] [[package]]
name = "index-scheduler" name = "index-scheduler"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"big_s", "big_s",
@ -2638,7 +2638,7 @@ dependencies = [
[[package]] [[package]]
name = "json-depth-checker" name = "json-depth-checker"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"criterion", "criterion",
"serde_json", "serde_json",
@ -3275,7 +3275,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "meili-snap" name = "meili-snap"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"insta", "insta",
"md5", "md5",
@ -3284,7 +3284,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch" name = "meilisearch"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"actix-cors", "actix-cors",
"actix-http", "actix-http",
@ -3377,7 +3377,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-auth" name = "meilisearch-auth"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"base64 0.21.7", "base64 0.21.7",
"enum-iterator", "enum-iterator",
@ -3396,7 +3396,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-types" name = "meilisearch-types"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"anyhow", "anyhow",
@ -3426,7 +3426,7 @@ dependencies = [
[[package]] [[package]]
name = "meilitool" name = "meilitool"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"clap", "clap",
@ -3465,7 +3465,7 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"arroy", "arroy",
"big_s", "big_s",
@ -3906,7 +3906,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]] [[package]]
name = "permissive-json-pointer" name = "permissive-json-pointer"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"big_s", "big_s",
"serde_json", "serde_json",
@ -6074,7 +6074,7 @@ dependencies = [
[[package]] [[package]]
name = "xtask" name = "xtask"
version = "1.8.0" version = "1.8.4"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"build-info", "build-info",

View File

@ -22,7 +22,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "1.8.0" version = "1.8.4"
authors = [ authors = [
"Quentin de Quelen <quentin@dequelen.me>", "Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>", "Clément Renault <clement@meilisearch.com>",

View File

@ -152,6 +152,7 @@ impl Settings<Unchecked> {
} }
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
#[cfg_attr(test, derive(serde::Serialize))] #[cfg_attr(test, derive(serde::Serialize))]
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]

View File

@ -182,6 +182,7 @@ impl Settings<Unchecked> {
} }
} }
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
#[derive(Debug, Clone, Deserialize)] #[derive(Debug, Clone, Deserialize)]
#[cfg_attr(test, derive(serde::Serialize))] #[cfg_attr(test, derive(serde::Serialize))]
#[serde(deny_unknown_fields)] #[serde(deny_unknown_fields)]

View File

@ -200,6 +200,7 @@ impl std::ops::Deref for IndexUid {
} }
} }
#[allow(dead_code)] // otherwise rustc complains that the fields go unused
#[derive(Debug)] #[derive(Debug)]
#[cfg_attr(test, derive(serde::Serialize))] #[cfg_attr(test, derive(serde::Serialize))]
#[cfg_attr(test, serde(rename_all = "camelCase"))] #[cfg_attr(test, serde(rename_all = "camelCase"))]

View File

@ -914,8 +914,34 @@ impl IndexScheduler {
if self.must_stop_processing.get() { if self.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
} }
let (_id, doc) = ret?; let (id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
'inject_vectors: {
let embeddings = index.embeddings(&rtxn, id)?;
if embeddings.is_empty() {
break 'inject_vectors;
}
let vectors = document
.entry("_vectors".to_owned())
.or_insert(serde_json::Value::Object(Default::default()));
let serde_json::Value::Object(vectors) = vectors else {
break 'inject_vectors;
};
for (embedder_name, embeddings) in embeddings {
vectors.entry(embedder_name).or_insert_with(|| {
serde_json::json!({
"embeddings": embeddings,
"regenerate": true
})
});
}
}
index_dumper.push_document(&document)?; index_dumper.push_document(&document)?;
} }

View File

@ -419,7 +419,41 @@ fn import_dump(
let file = tempfile::tempfile()?; let file = tempfile::tempfile()?;
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file)); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
for document in index_reader.documents()? { for document in index_reader.documents()? {
builder.append_json_object(&document?)?; let mut document = document?;
'remove_injected_vectors: {
let Some(vectors) = document.get_mut("_vectors") else {
break 'remove_injected_vectors;
};
let Some(vectors) = vectors.as_object_mut() else { break 'remove_injected_vectors };
vectors.retain(|_embedder, embedding_object| {
// don't touch values that aren't objects
let Some(embedding_object) = embedding_object.as_object() else {
return true;
};
let mut has_regenerate_true = false;
for (field, value) in embedding_object {
match (field.as_str(), value) {
// detected regenerate : true
// if we don't have any superfluous field, we'll remove the entire entry
("regenerate", serde_json::Value::Bool(true)) => {
has_regenerate_true = true;
}
// ignore embeddings
("embeddings", _) => continue,
// any other field: immediately retain the entry
_ => return true,
}
}
// retain the entry unless it has regenerate: true
!has_regenerate_true
})
}
builder.append_json_object(&document)?;
} }
// This flush the content of the batch builder. // This flush the content of the batch builder.

View File

@ -40,8 +40,9 @@ pub struct Permit {
impl Drop for Permit { impl Drop for Permit {
fn drop(&mut self) { fn drop(&mut self) {
let sender = self.sender.clone();
// if the channel is closed then the whole instance is down // if the channel is closed then the whole instance is down
let _ = futures::executor::block_on(self.sender.send(())); std::mem::drop(tokio::spawn(async move { sender.send(()).await }));
} }
} }

View File

@ -117,3 +117,69 @@ async fn geo_bounding_box_with_string_and_number() {
) )
.await; .await;
} }
#[actix_rt::test]
async fn bug_4640() {
// https://github.com/meilisearch/meilisearch/issues/4640
let server = Server::new().await;
let index = server.index("test");
let documents = DOCUMENTS.clone();
index.add_documents(documents, None).await;
index.update_settings_filterable_attributes(json!(["_geo"])).await;
let (ret, _code) = index.update_settings_sortable_attributes(json!(["_geo"])).await;
index.wait_task(ret.uid()).await;
// Sort the document with the second one first
index
.search(
json!({
"sort": ["_geoPoint(45.4777599, 9.1967508):asc"],
}),
|response, code| {
assert_eq!(code, 200, "{}", response);
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }), @r###"
{
"hits": [
{
"id": 2,
"name": "La Bella Italia",
"address": "456 Elm Street, Townsville",
"type": "Italian",
"rating": 9,
"_geo": {
"lat": "45.4777599",
"lng": "9.1967508"
}
},
{
"id": 1,
"name": "Taco Truck",
"address": "444 Salsa Street, Burritoville",
"type": "Mexican",
"rating": 9,
"_geo": {
"lat": 34.0522,
"lng": -118.2437
},
"_geoDistance": 9714063
},
{
"id": 3,
"name": "Crêpe Truck",
"address": "2 Billig Avenue, Rouenville",
"type": "French",
"rating": 10
}
],
"query": "",
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 3
}
"###);
},
)
.await;
}

View File

@ -74,10 +74,10 @@ csv = "1.3.0"
candle-core = { version = "0.4.1" } candle-core = { version = "0.4.1" }
candle-transformers = { version = "0.4.1" } candle-transformers = { version = "0.4.1" }
candle-nn = { version = "0.4.1" } candle-nn = { version = "0.4.1" }
tokenizers = { git = "https://github.com/huggingface/tokenizers.git", tag = "v0.15.2", version = "0.15.2", default_features = false, features = [ tokenizers = { git = "https://github.com/huggingface/tokenizers.git", tag = "v0.15.2", version = "0.15.2", default-features = false, features = [
"onig", "onig",
] } ] }
hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", default_features = false, features = [ hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", default-features = false, features = [
"online", "online",
] } ] }
tiktoken-rs = "0.5.8" tiktoken-rs = "0.5.8"

View File

@ -22,7 +22,7 @@ use crate::heed_codec::{
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::vector::EmbeddingConfig; use crate::vector::{Embedding, EmbeddingConfig};
use crate::{ use crate::{
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds, default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
FacetDistribution, FieldDistribution, FieldId, FieldIdWordCountCodec, GeoPoint, ObkvCodec, FacetDistribution, FieldDistribution, FieldId, FieldIdWordCountCodec, GeoPoint, ObkvCodec,
@ -1516,6 +1516,42 @@ impl Index {
.unwrap_or_default()) .unwrap_or_default())
} }
pub fn embeddings(
&self,
rtxn: &RoTxn<'_>,
docid: DocumentId,
) -> Result<BTreeMap<String, Vec<Embedding>>> {
let mut res = BTreeMap::new();
for row in self.embedder_category_id.iter(rtxn)? {
let (embedder_name, embedder_id) = row?;
let embedder_id = (embedder_id as u16) << 8;
let mut embeddings = Vec::new();
'vectors: for i in 0..=u8::MAX {
let reader = arroy::Reader::open(rtxn, embedder_id | (i as u16), self.vector_arroy)
.map(Some)
.or_else(|e| match e {
arroy::Error::MissingMetadata => Ok(None),
e => Err(e),
})
.transpose();
let Some(reader) = reader else {
break 'vectors;
};
let embedding = reader?.item_vector(rtxn, docid)?;
if let Some(embedding) = embedding {
embeddings.push(embedding)
} else {
break 'vectors;
}
}
res.insert(embedder_name.to_owned(), embeddings);
}
Ok(res)
}
pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> { pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> {
self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff) self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff)
} }

View File

@ -22,7 +22,7 @@ pub enum SearchEvents {
RankingRuleStartIteration { ranking_rule_idx: usize, universe_len: u64 }, RankingRuleStartIteration { ranking_rule_idx: usize, universe_len: u64 },
RankingRuleNextBucket { ranking_rule_idx: usize, universe_len: u64, bucket_len: u64 }, RankingRuleNextBucket { ranking_rule_idx: usize, universe_len: u64, bucket_len: u64 },
RankingRuleSkipBucket { ranking_rule_idx: usize, bucket_len: u64 }, RankingRuleSkipBucket { ranking_rule_idx: usize, bucket_len: u64 },
RankingRuleEndIteration { ranking_rule_idx: usize, universe_len: u64 }, RankingRuleEndIteration { ranking_rule_idx: usize },
ExtendResults { new: Vec<u32> }, ExtendResults { new: Vec<u32> },
ProximityGraph { graph: RankingRuleGraph<ProximityGraph> }, ProximityGraph { graph: RankingRuleGraph<ProximityGraph> },
ProximityPaths { paths: Vec<Vec<Interned<ProximityCondition>>> }, ProximityPaths { paths: Vec<Vec<Interned<ProximityCondition>>> },
@ -123,12 +123,9 @@ impl SearchLogger<QueryGraph> for VisualSearchLogger {
&mut self, &mut self,
ranking_rule_idx: usize, ranking_rule_idx: usize,
_ranking_rule: &dyn RankingRule<QueryGraph>, _ranking_rule: &dyn RankingRule<QueryGraph>,
universe: &RoaringBitmap, _universe: &RoaringBitmap,
) { ) {
self.events.push(SearchEvents::RankingRuleEndIteration { self.events.push(SearchEvents::RankingRuleEndIteration { ranking_rule_idx });
ranking_rule_idx,
universe_len: universe.len(),
});
self.location.pop(); self.location.pop();
} }
fn add_to_results(&mut self, docids: &[u32]) { fn add_to_results(&mut self, docids: &[u32]) {
@ -326,7 +323,7 @@ impl<'ctx> DetailedLoggerFinish<'ctx> {
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1); assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
self.write_skip_bucket(bucket_len)?; self.write_skip_bucket(bucket_len)?;
} }
SearchEvents::RankingRuleEndIteration { ranking_rule_idx, universe_len: _ } => { SearchEvents::RankingRuleEndIteration { ranking_rule_idx } => {
assert!(ranking_rule_idx == self.rr_action_counter.len() - 1); assert!(ranking_rule_idx == self.rr_action_counter.len() - 1);
self.write_end_iteration()?; self.write_end_iteration()?;
} }

View File

@ -45,7 +45,6 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>, obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
settings_diff: &InnerIndexSettingsDiff, settings_diff: &InnerIndexSettingsDiff,
geo_fields_ids: Option<(FieldId, FieldId)>,
) -> Result<ExtractedFacetValues> { ) -> Result<ExtractedFacetValues> {
puffin::profile_function!(); puffin::profile_function!();
@ -127,12 +126,18 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
add_exists.insert(document); add_exists.insert(document);
} }
let geo_support = let del_geo_support = settings_diff
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng); .old
.geo_fields_ids
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
let add_geo_support = settings_diff
.new
.geo_fields_ids
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
let del_filterable_values = let del_filterable_values =
del_value.map(|value| extract_facet_values(&value, geo_support)); del_value.map(|value| extract_facet_values(&value, del_geo_support));
let add_filterable_values = let add_filterable_values =
add_value.map(|value| extract_facet_values(&value, geo_support)); add_value.map(|value| extract_facet_values(&value, add_geo_support));
// Those closures are just here to simplify things a bit. // Those closures are just here to simplify things a bit.
let mut insert_numbers_diff = |del_numbers, add_numbers| { let mut insert_numbers_diff = |del_numbers, add_numbers| {

View File

@ -8,6 +8,7 @@ use super::helpers::{create_writer, writer_into_reader, GrenadParameters};
use crate::error::GeoError; use crate::error::GeoError;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::index_documents::extract_finite_float_from_value; use crate::update::index_documents::extract_finite_float_from_value;
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
use crate::{FieldId, InternalError, Result}; use crate::{FieldId, InternalError, Result};
/// Extracts the geographical coordinates contained in each document under the `_geo` field. /// Extracts the geographical coordinates contained in each document under the `_geo` field.
@ -18,7 +19,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
obkv_documents: grenad::Reader<R>, obkv_documents: grenad::Reader<R>,
indexer: GrenadParameters, indexer: GrenadParameters,
primary_key_id: FieldId, primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId), settings_diff: &InnerIndexSettingsDiff,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!(); puffin::profile_function!();
@ -40,23 +41,12 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
serde_json::from_slice(document_id).unwrap() serde_json::from_slice(document_id).unwrap()
}; };
// first we get the two fields // extract old version
match (obkv.get(lat_fid), obkv.get(lng_fid)) { let del_lat_lng =
(Some(lat), Some(lng)) => { extract_lat_lng(&obkv, &settings_diff.old, DelAdd::Deletion, document_id)?;
let deladd_lat_obkv = KvReaderDelAdd::new(lat); // extract new version
let deladd_lng_obkv = KvReaderDelAdd::new(lng); let add_lat_lng =
extract_lat_lng(&obkv, &settings_diff.new, DelAdd::Addition, document_id)?;
// then we extract the values
let del_lat_lng = deladd_lat_obkv
.get(DelAdd::Deletion)
.zip(deladd_lng_obkv.get(DelAdd::Deletion))
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
.transpose()?;
let add_lat_lng = deladd_lat_obkv
.get(DelAdd::Addition)
.zip(deladd_lng_obkv.get(DelAdd::Addition))
.map(|(lat, lng)| extract_lat_lng(lat, lng, document_id))
.transpose()?;
if del_lat_lng != add_lat_lng { if del_lat_lng != add_lat_lng {
let mut obkv = KvWriterDelAdd::memory(); let mut obkv = KvWriterDelAdd::memory();
@ -74,21 +64,31 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
writer.insert(docid_bytes, bytes)?; writer.insert(docid_bytes, bytes)?;
} }
} }
(None, Some(_)) => {
return Err(GeoError::MissingLatitude { document_id: document_id() }.into())
}
(Some(_), None) => {
return Err(GeoError::MissingLongitude { document_id: document_id() }.into())
}
(None, None) => (),
}
}
writer_into_reader(writer) writer_into_reader(writer)
} }
/// Extract the finite floats lat and lng from two bytes slices. /// Extract the finite floats lat and lng from two bytes slices.
fn extract_lat_lng(lat: &[u8], lng: &[u8], document_id: impl Fn() -> Value) -> Result<[f64; 2]> { fn extract_lat_lng(
document: &obkv::KvReader<FieldId>,
settings: &InnerIndexSettings,
deladd: DelAdd,
document_id: impl Fn() -> Value,
) -> Result<Option<[f64; 2]>> {
match settings.geo_fields_ids {
Some((lat_fid, lng_fid)) => {
let lat = document.get(lat_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
let lng = document.get(lng_fid).map(KvReaderDelAdd::new).and_then(|r| r.get(deladd));
let (lat, lng) = match (lat, lng) {
(Some(lat), Some(lng)) => (lat, lng),
(Some(_), None) => {
return Err(GeoError::MissingLatitude { document_id: document_id() }.into())
}
(None, Some(_)) => {
return Err(GeoError::MissingLongitude { document_id: document_id() }.into())
}
(None, None) => return Ok(None),
};
let lat = extract_finite_float_from_value( let lat = extract_finite_float_from_value(
serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?, serde_json::from_slice(lat).map_err(InternalError::SerdeJson)?,
) )
@ -98,6 +98,8 @@ fn extract_lat_lng(lat: &[u8], lng: &[u8], document_id: impl Fn() -> Value) -> R
serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?, serde_json::from_slice(lng).map_err(InternalError::SerdeJson)?,
) )
.map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?; .map_err(|lng| GeoError::BadLongitude { document_id: document_id(), value: lng })?;
Ok(Some([lat, lng]))
Ok([lat, lng]) }
None => Ok(None),
}
} }

View File

@ -11,7 +11,7 @@ mod extract_word_position_docids;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::sync::Arc; use std::sync::{Arc, OnceLock};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use rayon::prelude::*; use rayon::prelude::*;
@ -31,7 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
/// Extract data for each databases from obkv documents in parallel. /// Extract data for each databases from obkv documents in parallel.
/// Send data in grenad file over provided Sender. /// Send data in grenad file over provided Sender.
@ -43,7 +43,6 @@ pub(crate) fn data_from_obkv_documents(
indexer: GrenadParameters, indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>, lmdb_writer_sx: Sender<Result<TypedChunk>>,
primary_key_id: FieldId, primary_key_id: FieldId,
geo_fields_ids: Option<(FieldId, FieldId)>,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
) -> Result<()> { ) -> Result<()> {
@ -72,7 +71,6 @@ pub(crate) fn data_from_obkv_documents(
indexer, indexer,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
primary_key_id, primary_key_id,
geo_fields_ids,
settings_diff.clone(), settings_diff.clone(),
max_positions_per_attributes, max_positions_per_attributes,
) )
@ -215,6 +213,18 @@ fn run_extraction_task<FE, FS, M>(
}) })
} }
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();
REQUEST_THREADS.get_or_init(|| {
ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()
.unwrap()
})
}
/// Extract chunked data and send it into lmdb_writer_sx sender: /// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents /// - documents
fn send_original_documents_data( fn send_original_documents_data(
@ -229,11 +239,6 @@ fn send_original_documents_data(
let documents_chunk_cloned = original_documents_chunk.clone(); let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let request_threads = ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() { if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
let settings_diff = settings_diff.clone(); let settings_diff = settings_diff.clone();
rayon::spawn(move || { rayon::spawn(move || {
@ -251,7 +256,7 @@ fn send_original_documents_data(
prompts, prompts,
indexer, indexer,
embedder.clone(), embedder.clone(),
&request_threads, request_threads(),
) { ) {
Ok(results) => Some(results), Ok(results) => Some(results),
Err(error) => { Err(error) => {
@ -300,7 +305,6 @@ fn send_and_extract_flattened_documents_data(
indexer: GrenadParameters, indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>, lmdb_writer_sx: Sender<Result<TypedChunk>>,
primary_key_id: FieldId, primary_key_id: FieldId,
geo_fields_ids: Option<(FieldId, FieldId)>,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
) -> Result<( ) -> Result<(
@ -310,12 +314,13 @@ fn send_and_extract_flattened_documents_data(
let flattened_documents_chunk = let flattened_documents_chunk =
flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
if let Some(geo_fields_ids) = geo_fields_ids { if settings_diff.run_geo_indexing() {
let documents_chunk_cloned = flattened_documents_chunk.clone(); let documents_chunk_cloned = flattened_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone(); let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let settings_diff = settings_diff.clone();
rayon::spawn(move || { rayon::spawn(move || {
let result = let result =
extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, geo_fields_ids); extract_geo_points(documents_chunk_cloned, indexer, primary_key_id, &settings_diff);
let _ = match result { let _ = match result {
Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))), Ok(geo_points) => lmdb_writer_sx_cloned.send(Ok(TypedChunk::GeoPoints(geo_points))),
Err(error) => lmdb_writer_sx_cloned.send(Err(error)), Err(error) => lmdb_writer_sx_cloned.send(Err(error)),
@ -354,7 +359,6 @@ fn send_and_extract_flattened_documents_data(
flattened_documents_chunk.clone(), flattened_documents_chunk.clone(),
indexer, indexer,
&settings_diff, &settings_diff,
geo_fields_ids,
)?; )?;
// send fid_docid_facet_numbers_chunk to DB writer // send fid_docid_facet_numbers_chunk to DB writer

View File

@ -324,28 +324,6 @@ where
// get the primary key field id // get the primary key field id
let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap(); let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap();
// get the fid of the `_geo.lat` and `_geo.lng` fields.
let mut field_id_map = self.index.fields_ids_map(self.wtxn)?;
// self.index.fields_ids_map($a)? ==>> field_id_map
let geo_fields_ids = match field_id_map.id("_geo") {
Some(gfid) => {
let is_sortable = self.index.sortable_fields_ids(self.wtxn)?.contains(&gfid);
let is_filterable = self.index.filterable_fields_ids(self.wtxn)?.contains(&gfid);
// if `_geo` is faceted then we get the `lat` and `lng`
if is_sortable || is_filterable {
let field_ids = field_id_map
.insert("_geo.lat")
.zip(field_id_map.insert("_geo.lng"))
.ok_or(UserError::AttributeLimitReached)?;
Some(field_ids)
} else {
None
}
}
None => None,
};
let pool_params = GrenadParameters { let pool_params = GrenadParameters {
chunk_compression_type: self.indexer_config.chunk_compression_type, chunk_compression_type: self.indexer_config.chunk_compression_type,
chunk_compression_level: self.indexer_config.chunk_compression_level, chunk_compression_level: self.indexer_config.chunk_compression_level,
@ -412,7 +390,6 @@ where
pool_params, pool_params,
lmdb_writer_sx.clone(), lmdb_writer_sx.clone(),
primary_key_id, primary_key_id,
geo_fields_ids,
settings_diff.clone(), settings_diff.clone(),
max_positions_per_attributes, max_positions_per_attributes,
) )

View File

@ -48,7 +48,6 @@ pub struct Transform<'a, 'i> {
fields_ids_map: FieldsIdsMap, fields_ids_map: FieldsIdsMap,
indexer_settings: &'a IndexerConfig, indexer_settings: &'a IndexerConfig,
pub autogenerate_docids: bool,
pub index_documents_method: IndexDocumentsMethod, pub index_documents_method: IndexDocumentsMethod,
available_documents_ids: AvailableDocumentsIds, available_documents_ids: AvailableDocumentsIds,
@ -102,7 +101,7 @@ impl<'a, 'i> Transform<'a, 'i> {
index: &'i Index, index: &'i Index,
indexer_settings: &'a IndexerConfig, indexer_settings: &'a IndexerConfig,
index_documents_method: IndexDocumentsMethod, index_documents_method: IndexDocumentsMethod,
autogenerate_docids: bool, _autogenerate_docids: bool,
) -> Result<Self> { ) -> Result<Self> {
// We must choose the appropriate merge function for when two or more documents // We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch. // with the same user id must be merged or fully replaced in the same batch.
@ -136,7 +135,6 @@ impl<'a, 'i> Transform<'a, 'i> {
index, index,
fields_ids_map: index.fields_ids_map(wtxn)?, fields_ids_map: index.fields_ids_map(wtxn)?,
indexer_settings, indexer_settings,
autogenerate_docids,
available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids), available_documents_ids: AvailableDocumentsIds::from_documents_ids(&documents_ids),
original_sorter, original_sorter,
flattened_sorter, flattened_sorter,

View File

@ -1161,6 +1161,11 @@ impl InnerIndexSettingsDiff {
pub fn settings_update_only(&self) -> bool { pub fn settings_update_only(&self) -> bool {
self.settings_update_only self.settings_update_only
} }
pub fn run_geo_indexing(&self) -> bool {
self.old.geo_fields_ids != self.new.geo_fields_ids
|| (!self.settings_update_only && self.new.geo_fields_ids.is_some())
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -1177,6 +1182,7 @@ pub(crate) struct InnerIndexSettings {
pub proximity_precision: ProximityPrecision, pub proximity_precision: ProximityPrecision,
pub embedding_configs: EmbeddingConfigs, pub embedding_configs: EmbeddingConfigs,
pub existing_fields: HashSet<String>, pub existing_fields: HashSet<String>,
pub geo_fields_ids: Option<(FieldId, FieldId)>,
} }
impl InnerIndexSettings { impl InnerIndexSettings {
@ -1185,7 +1191,7 @@ impl InnerIndexSettings {
let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap());
let allowed_separators = index.allowed_separators(rtxn)?; let allowed_separators = index.allowed_separators(rtxn)?;
let dictionary = index.dictionary(rtxn)?; let dictionary = index.dictionary(rtxn)?;
let fields_ids_map = index.fields_ids_map(rtxn)?; let mut fields_ids_map = index.fields_ids_map(rtxn)?;
let user_defined_searchable_fields = index.user_defined_searchable_fields(rtxn)?; let user_defined_searchable_fields = index.user_defined_searchable_fields(rtxn)?;
let user_defined_searchable_fields = let user_defined_searchable_fields =
user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect()); user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect());
@ -1200,6 +1206,24 @@ impl InnerIndexSettings {
.into_iter() .into_iter()
.filter_map(|(field, count)| (count != 0).then_some(field)) .filter_map(|(field, count)| (count != 0).then_some(field))
.collect(); .collect();
// index.fields_ids_map($a)? ==>> fields_ids_map
let geo_fields_ids = match fields_ids_map.id("_geo") {
Some(gfid) => {
let is_sortable = index.sortable_fields_ids(rtxn)?.contains(&gfid);
let is_filterable = index.filterable_fields_ids(rtxn)?.contains(&gfid);
// if `_geo` is faceted then we get the `lat` and `lng`
if is_sortable || is_filterable {
let field_ids = fields_ids_map
.insert("_geo.lat")
.zip(fields_ids_map.insert("_geo.lng"))
.ok_or(UserError::AttributeLimitReached)?;
Some(field_ids)
} else {
None
}
}
None => None,
};
Ok(Self { Ok(Self {
stop_words, stop_words,
@ -1214,6 +1238,7 @@ impl InnerIndexSettings {
proximity_precision, proximity_precision,
embedding_configs, embedding_configs,
existing_fields, existing_fields,
geo_fields_ids,
}) })
} }

View File

@ -21,7 +21,7 @@ reqwest = { version = "0.11.23", features = [
"stream", "stream",
"json", "json",
"rustls-tls", "rustls-tls",
], default_features = false } ], default-features = false }
serde = { version = "1.0.195", features = ["derive"] } serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111" serde_json = "1.0.111"
sha2 = "0.10.8" sha2 = "0.10.8"