Compare commits

...

74 Commits

Author SHA1 Message Date
226bcb2717 Do not create too many rayon tasks when processing the settings 2025-01-29 17:02:06 +01:00
cd58a71f57 panic on serde json 2025-01-29 10:13:02 +01:00
e0f446e4d3 Remove a log that would log too much 2025-01-28 21:31:01 +01:00
3bbad823e0 Refine the env variable and the max readers 2025-01-28 21:31:01 +01:00
b605549bf2 Do not create too many rayon tasks 2025-01-28 21:31:01 +01:00
6a1062edf5 Add more logs to see calls to the embedders 2025-01-28 21:31:01 +01:00
426ea5aa97 Accept the max readers param by env var and increase it 2025-01-28 21:31:00 +01:00
e20b91210d Merge #5276
5276: Fix the stuck indexation due to the internal BBQueue capacity r=curquiza a=Kerollmops

Fixes https://github.com/meilisearch/meilisearch/issues/5277. Reduce the maximum reserve grant in the BBQueue so we are never stuck.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
2025-01-23 13:41:34 +00:00
17478301ab Merge #5278
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 14s
Test suite / Run tests in debug (push) Failing after 13s
Test suite / Run Clippy (push) Failing after 14s
Test suite / Tests on windows-2022 (push) Failing after 26s
Test suite / Run Rustfmt (push) Successful in 1m43s
Test suite / Tests on macos-13 (push) Has been cancelled
5278: Update version for the next release (v1.12.7) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-23 10:47:30 +00:00
968c9dff27 Update version for the next release (v1.12.7) in Cargo.toml 2025-01-23 10:17:23 +00:00
463553988c Support offline upgrade up to v1.12.7 2025-01-23 11:11:40 +01:00
c321fdb9c0 Comment the max grant of the bbqueue
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-23 11:09:20 +01:00
36b6e94b29 Give more RAM to bbqueue.
- bbqueue buffers used to have (5% * 2%) / num_threads
- they now have 5% / num_threads
2025-01-23 10:55:03 +01:00
34dea863e5 Reduce the maximum grant possible we can store in the BBQueue 2025-01-23 10:43:28 +01:00
ad9d8e10f2 Merge #5260
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 2s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 1s
Test suite / Tests on windows-2022 (push) Failing after 26s
Test suite / Run Rustfmt (push) Successful in 1m57s
Test suite / Run Clippy (push) Successful in 6m6s
Test suite / Tests on macos-13 (push) Has been cancelled
5260: Update version for the next release (v1.12.6) in Cargo.toml r=Kerollmops a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: Kerollmops <Kerollmops@users.noreply.github.com>
2025-01-21 12:37:46 +00:00
f7f35ef37c Update version for the next release (v1.12.6) in Cargo.toml 2025-01-21 12:22:56 +00:00
c575d2693b Merge #5258
5258: Unify facet strings by their normalized value r=ManyTheFish a=dureuill

Fixes #5228: the "missing facet keys" issue.

- Before this PR, updating a document such that `"facet": "DUREUILL"` would become `"facet": "dureuill"` could cause the normalized facet value `dureuill` to be removed from `field_id_docid_facet_strings` db.
- This PR makes sure to unify the intermediate representation of the facet strings by their field_id and **normalized** (and truncated) string value.
- The introduced test is testing only one of the two facet distribution algorithms.
- We removed the panic when the facet string was not found, and we instead returned the normalized string.

## Draft status

- [x] target release v1.12.6 branch and milestone
- [ ] ~consider meilitool offline upgrade to fix the corrupted dbs in the wild.~
   workaround: ~remove facets, then add them again... if your facet distribution is right.~ Just use a dump.
- [x] Add unit test demonstrating the issue fixed by this PR.

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: Kerollmops <clement@meilisearch.com>
2025-01-21 11:02:33 +00:00
024e06f7e3 Do not panic when the facet string is not found 2025-01-21 12:01:26 +01:00
145fa3a8ff Add a test to check the facet casing is good 2025-01-21 11:42:25 +01:00
d3a7e10348 Unify facet strings by their normalized value 2025-01-21 00:11:50 +01:00
1c78447226 Merge #5246
Some checks failed
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
Test suite / Tests on ubuntu-20.04 (push) Failing after 1s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Run tests in debug (push) Failing after 17s
Test suite / Run Rustfmt (push) Failing after 16s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 40s
Test suite / Run Clippy (push) Failing after 1m24s
5246: Fix dump import r=Kerollmops a=dureuill

- Fix: handle the change of format of the update files
  - Correctly handle update files as JSON stream rather than obkv when exporting a dump with enqueued tasks
  - Correctly recreate update files as JSON stream rather than obkv when importing a dump
  - As the dump format itself didn't change, all dumps are still compatible
- Temporary workaround for https://github.com/meilisearch/meilisearch/issues/5247: set the batch uid of tasks to `null` at dump export time.
- Changes to meilitool
  - Export dump with update files in new format if DB >= v1.12
  - offline upgrade now supports upgrading from [1.9.0-1.12.5] to [1.10.0-1.12.5].
  - offline upgrade supports no-op upgrades and has better error messages 

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
2025-01-20 13:03:49 +00:00
c55891f73b Replace guards by OR patterns
Co-authored-by: Tamo <tamo@meilisearch.com>
2025-01-20 11:46:03 +01:00
40f8c0d840 Remove batch ids on export 2025-01-20 11:16:18 +01:00
34d8c1a903 Make offline upgrade more flexible 2025-01-20 10:43:47 +01:00
3c9483b6e0 meilitool dumps old-style dump for older DBs, otherwise new-style 2025-01-20 10:43:47 +01:00
8c789b3c7a Merge #5252
5252: Update version for the next release (v1.12.5) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-20 09:03:35 +00:00
3403eae9ee Update version for the next release (v1.12.5) in Cargo.toml 2025-01-20 08:53:20 +00:00
11458eefd9 Handle empty payloads 2025-01-20 09:51:07 +01:00
289eb92bef Fix warnings 2025-01-20 09:51:07 +01:00
cea0c89212 Change format of update file when importing dump 2025-01-20 09:51:07 +01:00
1cadab9ad8 Also fix dump import from meilitool 2025-01-20 09:51:07 +01:00
6383f8f19e Do not explode on missing content file if the task has no docs 2025-01-20 09:51:06 +01:00
8a9f952bda Create update files in new format 2025-01-20 09:51:06 +01:00
a5c44b4d79 Merge #5242
Some checks failed
Test suite / Tests on ubuntu-20.04 (push) Failing after 13s
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 14s
Test suite / Run tests in debug (push) Failing after 11s
Test suite / Run Clippy (push) Successful in 7m16s
Test suite / Run Rustfmt (push) Successful in 2m41s
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Has been cancelled
5242: Fix infinite loop r=Kerollmops a=dureuill

- Fix possible infinite loop by releasing `writer_receiver` as soon as writing to DB panics
- Demote panic to error log

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-16 14:13:07 +00:00
8c35744848 Improve error log 2025-01-16 14:33:53 +01:00
c0d414fc3c Merge #5243
5243: Update version for the next release (v1.12.4) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-16 10:57:55 +00:00
b56358f606 Update version for the next release (v1.12.4) in Cargo.toml 2025-01-16 10:47:17 +00:00
b84c0a5390 Demote panic to error log 2025-01-16 11:38:21 +01:00
ce621e447e Release writer_receiver as soon as writing to db panics 2025-01-16 11:37:37 +01:00
aee74f47aa Merge pull request #5229 from meilisearch/improve-unknown-entry-deletion-report
Improve the panic message when deleting an unknown entry
2025-01-13 14:20:24 +01:00
be2717edbd Merge pull request #5224 from meilisearch/fix-facet-distribution
Fix facet distribution
2025-01-13 14:20:09 +01:00
c66841626e Update after review 2025-01-13 10:43:26 +01:00
d0bc8c755a Improve the panic message when deleting an unknown entry 2025-01-13 10:30:53 +01:00
031abfd281 Merge #5227
5227: Update version for the next release (v1.12.3) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-13 09:29:18 +00:00
27169bc7b4 Update version for the next release (v1.12.3) in Cargo.toml 2025-01-13 08:47:07 +00:00
181a01f8d8 Skip rebuilding field distribution if not coming from v1.12 2025-01-13 09:31:27 +01:00
1d153c1867 write stats after rebuilding facet distribution 2025-01-09 18:13:36 +01:00
5fde2a3ee1 Add support to upgrade to v1.12.3 in meilitool 2025-01-09 15:25:44 +01:00
4465a1a3c9 Fix test 2025-01-09 13:26:17 +01:00
e342ae1b46 Add currently failing test 2025-01-09 13:26:13 +01:00
dcb4c49cf2 Merge #5205
Some checks failed
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
Test suite / Run tests in debug (push) Failing after 11s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 30s
Test suite / Run Rustfmt (push) Successful in 1m40s
Test suite / Run Clippy (push) Successful in 5m50s
5205: Incremental facets on v1.12 r=curquiza a=dureuill

# Pull Request

## Related issue

Fixes https://github.com/meilisearch/meilisearch/issues/5213

## What does this PR do?
- Add `new_incremental`module that computes incremental facet indexing for the new indexer
- Change heuristics for incremental vs bulk facet choice: under 1000 operations is now always incremental, over 100_000 operations is now always bulk)
- Add sanity checks in debug

## Future improvements

- Use multi ops from Roaring to decrease the number of allocations
- Consider removing or adding multiple levels at once instead of max once per update
- Consider using information about the tree structure + the operations that were done (e.g. only addition) to avoid recomputing the group from all children 
- Consider making the algorithm parallel and looking into the roaring values to know which actually changed



Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2025-01-08 21:52:02 +00:00
e83c021755 When spilling on the next fid, no longer ignore children 2025-01-08 16:50:05 +01:00
7ec7200378 Check valid_facet_value as part of a filter of the iterator 2025-01-08 16:25:44 +01:00
6a577254fa No longer ignore the first child without parent 2025-01-08 16:25:30 +01:00
fd88c834c3 Modernize valid_lmdb_key 2025-01-08 15:22:11 +01:00
b4005593f4 Switch to an iterative algorithm for find_changed_parents 2025-01-08 14:57:14 +01:00
8ee3793259 Update after review 2025-01-08 13:58:14 +01:00
3648abbfd5 Remove unused FacetFieldIdOperation 2025-01-07 15:26:09 +01:00
4d2433de12 center groups 2025-01-06 18:23:35 +01:00
28cc6df7a3 Fix uselessly deep stack trace 2025-01-06 18:07:49 +01:00
7b14cb10a1 Merge #5207
Some checks failed
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Waiting to run
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 13s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 13s
Test suite / Run tests in debug (push) Failing after 12s
Test suite / Run Rustfmt (push) Successful in 1m40s
Test suite / Run Clippy (push) Successful in 5m49s
5207: Update version for the next release (v1.12.2) in Cargo.toml r=dureuill a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: dureuill <dureuill@users.noreply.github.com>
2025-01-06 16:04:03 +00:00
34f4602ae8 Update snapshot 2025-01-06 16:55:12 +01:00
12e21a177b Update version for the next release (v1.12.2) in Cargo.toml 2025-01-06 14:11:58 +00:00
7a9290aaae Use new incremental facet indexing and enable sanity checks in debug 2025-01-06 15:08:48 +01:00
5d219587b8 Add new incremental facet indexing 2025-01-06 15:08:36 +01:00
6e9aa49893 add valid_facet_value utility function 2025-01-06 15:08:07 +01:00
6b3a2c7281 Add sanity checks for facet values 2025-01-06 15:07:55 +01:00
5908aec6cb Merge #5192
Some checks failed
Test suite / Tests almost all features (push) Has been skipped
Test suite / Test disabled tokenization (push) Has been skipped
Test suite / Tests on ubuntu-20.04 (push) Failing after 12s
Test suite / Run tests in debug (push) Failing after 12s
Test suite / Tests on ${{ matrix.os }} (windows-2022) (push) Failing after 28s
Test suite / Run Rustfmt (push) Successful in 2m16s
Test suite / Run Clippy (push) Successful in 6m20s
Test suite / Tests on ${{ matrix.os }} (macos-13) (push) Has been cancelled
5192: Fix empty document addition r=irevoire a=irevoire

# Pull Request

## Related issue
Fixes #5190

## What does this PR do?
- Improve a test just to make sure this issue never arises again
- Fix the issue

For the reviewer: Calling `add_documents` with an empty `mmap` seems to work, but does it impact the perf in a significant way? / 

Co-authored-by: Tamo <tamo@meilisearch.com>
2024-12-31 17:11:10 +00:00
19f48c15fb Fix the addition of empty payload 2024-12-31 18:00:14 +01:00
47b484c07c update the test to ensure it works when specifying the primary key or not: it doesn't work 2024-12-31 17:24:32 +01:00
7d5e28b475 Merge #5193
5193: Update version for the next release (v1.12.1) in Cargo.toml r=irevoire a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: curquiza <curquiza@users.noreply.github.com>
2024-12-31 09:40:31 +00:00
0648e06aa2 Update version for the next release (v1.12.1) in Cargo.toml 2024-12-30 17:36:46 +00:00
33921747b7 stop skipping empty tasks when adding documents 2024-12-30 17:48:25 +01:00
970a489dcc add a test reproducing the bug 2024-12-30 16:21:06 +01:00
45 changed files with 1788 additions and 297 deletions

34
Cargo.lock generated
View File

@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"bumpalo",
@ -689,7 +689,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"time",
@ -1664,7 +1664,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"big_s",
@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
[[package]]
name = "file-store"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"tempfile",
"thiserror",
@ -1898,7 +1898,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"insta",
"nom",
@ -1918,7 +1918,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"criterion",
"serde_json",
@ -2057,7 +2057,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"arbitrary",
"bumpalo",
@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -2822,7 +2822,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"criterion",
"serde_json",
@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"insta",
"md5",
@ -3450,7 +3450,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"actix-cors",
"actix-http",
@ -3540,7 +3540,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@ -3559,7 +3559,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"actix-web",
"anyhow",
@ -3592,7 +3592,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
@ -3627,7 +3627,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"big_s",
"serde_json",
@ -6486,7 +6486,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.12.0"
version = "1.12.7"
dependencies = [
"anyhow",
"build-info",

View File

@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.12.0"
version = "1.12.7"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@ -29,7 +29,7 @@ use bumpalo::Bump;
use dump::IndexMetadata;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
@ -819,6 +819,13 @@ impl IndexScheduler {
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
// This prevent from referencing *future* batches not actually associated with the task.
//
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
t.batch_uid = None;
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
@ -829,21 +836,20 @@ impl IndexScheduler {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(|e| Error::from_milli(e.into(), None))?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor
.next_document()
.map_err(|e| Error::from_milli(e.into(), None))?
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
dump_content_file.push_document(
&obkv_to_object(doc, &documents_batch_index)
.map_err(|e| Error::from_milli(e, None))?,
)?;
let document = document
.map_err(|e| {
Error::from_milli(
milli::InternalError::SerdeJson(e).into(),
None,
)
})
.unwrap();
dump_content_file.push_document(&document)?;
}
dump_content_file.flush()?;
}
}
@ -1312,9 +1318,7 @@ impl IndexScheduler {
if let DocumentOperation::Add(content_uuid) = operation {
let content_file = self.file_store.get_update(*content_uuid)?;
let mmap = unsafe { memmap2::Mmap::map(&content_file)? };
if !mmap.is_empty() {
content_files.push(mmap);
}
content_files.push(mmap);
}
}

View File

@ -1,5 +1,7 @@
use std::collections::BTreeMap;
use std::env::VarError;
use std::path::Path;
use std::str::FromStr;
use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
@ -302,7 +304,15 @@ fn create_or_open_index(
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size));
options.max_readers(1024);
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
Ok(value) => u32::from_str(&value).unwrap(),
Err(VarError::NotPresent) => 1024,
Err(VarError::NotUnicode(value)) => panic!(
"Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}"
),
};
options.max_readers(max_readers);
if enable_mdb_writemap {
unsafe { options.flags(EnvFlags::WRITE_MAP) };
}

View File

@ -55,7 +55,6 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -2017,14 +2016,21 @@ impl<'a> Dump<'a> {
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0);
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
let mut builder = DocumentsBatchBuilder::new(&mut file);
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
let mut writer = io::BufWriter::new(file);
for doc in content_file {
builder.append_json_object(&doc?)?;
let doc = doc?;
serde_json::to_writer(&mut writer, &doc)
.map_err(|e| {
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
})
.unwrap();
}
builder.into_inner()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.persist()?;
Some(uuid)
@ -2032,6 +2038,12 @@ impl<'a> Dump<'a> {
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
None if task.status == Status::Enqueued && task_has_no_docs => {
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
file.persist()?;
Some(uuid)
}
_ => None,
};

View File

@ -291,7 +291,10 @@ impl IndexScheduler {
debug_assert!(old_task != *task);
debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
debug_assert!(
old_task.batch_uid.is_none() && task.batch_uid.is_some(),
"\n==> old: {old_task:?}\n==> new: {task:?}"
);
if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| {

View File

@ -1337,7 +1337,7 @@ impl<'a> HitMaker<'a> {
ExplicitVectors { embeddings: Some(vector.into()), regenerate: !user_provided };
vectors.insert(
name,
serde_json::to_value(embeddings).map_err(InternalError::SerdeJson)?,
serde_json::to_value(embeddings).map_err(InternalError::SerdeJson).unwrap(),
);
}
document.insert("_vectors".into(), vectors.into());
@ -1717,7 +1717,7 @@ fn make_document(
// recreate the original json
for (key, value) in obkv.iter() {
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
let key = field_ids_map.name(key).expect("Missing field name").to_string();
document.insert(key, value);

View File

@ -1220,9 +1220,89 @@ async fn replace_document() {
#[actix_rt::test]
async fn add_no_documents() {
let server = Server::new().await;
let index = server.index("test");
let (_response, code) = index.add_documents(json!([]), None).await;
let index = server.index("kefir");
let (task, code) = index.add_documents(json!([]), None).await;
snapshot!(code, @"202 Accepted");
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 0,
"indexedDocuments": 0
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (task, _code) = index.add_documents(json!([]), Some("kefkef")).await;
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 0,
"indexedDocuments": 0
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (task, _code) = index.add_documents(json!([{ "kefkef": 1 }]), None).await;
let task = server.wait_task(task.uid()).await;
let task = task.succeeded();
snapshot!(task, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "kefir",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
let (documents, _status) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(documents, @r#"
{
"results": [
{
"kefkef": 1
}
],
"offset": 0,
"limit": 20,
"total": 1
}
"#);
}
#[actix_rt::test]

View File

@ -1746,3 +1746,57 @@ async fn change_attributes_settings() {
)
.await;
}
/// Modifying facets with different casing should work correctly
#[actix_rt::test]
async fn change_facet_casing() {
let server = Server::new().await;
let index = server.index("test");
let (response, code) = index
.update_settings(json!({
"filterableAttributes": ["dog"],
}))
.await;
assert_eq!("202", code.as_str(), "{:?}", response);
index.wait_task(response.uid()).await;
let (response, _code) = index
.add_documents(
json!([
{
"id": 1,
"dog": "Bouvier Bernois"
}
]),
None,
)
.await;
index.wait_task(response.uid()).await;
let (response, _code) = index
.add_documents(
json!([
{
"id": 1,
"dog": "bouvier bernois"
}
]),
None,
)
.await;
index.wait_task(response.uid()).await;
index
.search(json!({ "facets": ["dog"] }), |response, code| {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response["facetDistribution"]), @r###"
{
"dog": {
"bouvier bernois": 1
}
}
"###);
})
.await;
}

View File

@ -14,11 +14,11 @@ arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/ar
clap = { version = "4.5.9", features = ["derive"] }
dump = { path = "../dump" }
file-store = { path = "../file-store" }
indexmap = {version = "2.7.0", features = ["serde"]}
indexmap = { version = "2.7.0", features = ["serde"] }
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
serde = { version = "1.0.209", features = ["derive"] }
serde_json = {version = "1.0.133", features = ["preserve_order"]}
serde_json = { version = "1.0.133", features = ["preserve_order"] }
tempfile = "3.14.0"
time = { version = "0.3.36", features = ["formatting", "parsing", "alloc"] }
uuid = { version = "1.10.0", features = ["v4"], default-features = false }

View File

@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
match command {
Command::ClearTaskQueue => clear_task_queue(db_path),
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
}
Command::OfflineUpgrade { target_version } => {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
@ -187,6 +187,7 @@ fn export_a_dump(
db_path: PathBuf,
dump_dir: PathBuf,
skip_enqueued_tasks: bool,
detected_version: (String, String, String),
) -> Result<(), anyhow::Error> {
let started_at = OffsetDateTime::now_utc();
@ -238,9 +239,6 @@ fn export_a_dump(
if skip_enqueued_tasks {
eprintln!("Skip dumping the enqueued tasks...");
} else {
eprintln!("Dumping the enqueued tasks...");
// 3. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
let mut count = 0;
for ret in all_tasks.iter(&rtxn)? {
@ -254,18 +252,39 @@ fn export_a_dump(
if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?;
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
if (
detected_version.0.as_str(),
detected_version.1.as_str(),
detected_version.2.as_str(),
) < ("1", "12", "0")
{
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
}
} else {
eprintln!(
"Dumping the enqueued tasks reading them in JSON stream format..."
);
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
dump_content_file.push_document(&document)?;
}
}
dump_content_file.flush()?;
count += 1;
}

View File

@ -8,7 +8,7 @@ use std::path::{Path, PathBuf};
use anyhow::{bail, Context};
use meilisearch_types::versioning::create_version_file;
use v1_10::v1_9_to_v1_10;
use v1_12::v1_11_to_v1_12;
use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3};
use crate::upgrade::v1_11::v1_10_to_v1_11;
@ -20,12 +20,48 @@ pub struct OfflineUpgrade {
impl OfflineUpgrade {
pub fn upgrade(self) -> anyhow::Result<()> {
// Adding a version?
//
// 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION.
// 2. Add new version to the upgrade list if necessary
// 3. Use `no_upgrade` as index for versions that are compatible.
if self.current_version == self.target_version {
println!("Database is already at the target version. Exiting.");
return Ok(());
}
if self.current_version > self.target_version {
bail!(
"Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported",
self.current_version.0,
self.current_version.1,
self.current_version.2,
self.target_version.0,
self.target_version.1,
self.target_version.2
);
}
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7";
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
let upgrade_list = [
(v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"),
(
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
"1",
"10",
"0",
),
(v1_10_to_v1_11, "1", "11", "0"),
(v1_11_to_v1_12, "1", "12", "0"),
(v1_12_to_v1_12_3, "1", "12", "3"),
];
let no_upgrade: usize = upgrade_list.len();
let (current_major, current_minor, current_patch) = &self.current_version;
let start_at = match (
@ -36,8 +72,12 @@ impl OfflineUpgrade {
("1", "9", _) => 0,
("1", "10", _) => 1,
("1", "11", _) => 2,
("1", "12", "0" | "1" | "2") => 3,
("1", "12", "3" | "4" | "5" | "6" | "7") => no_upgrade,
_ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
LAST_SUPPORTED_UPGRADE_FROM_VERSION);
}
};
@ -46,21 +86,32 @@ impl OfflineUpgrade {
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
("1", "10", _) => 0,
("1", "11", _) => 1,
("1", "12", _) => 2,
("1", "12", "0" | "1" | "2") => 2,
("1", "12", "3" | "4" | "5" | "6" | "7") => 3,
(major, _, _) if major.starts_with('v') => {
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
}
_ => {
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11")
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_TO_VERSION,
LAST_SUPPORTED_UPGRADE_TO_VERSION);
}
};
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
if start_at == no_upgrade {
println!("No upgrade operation to perform, writing VERSION file");
create_version_file(&self.db_path, target_major, target_minor, target_patch)
.context("while writing VERSION file after the upgrade")?;
println!("Success");
return Ok(());
}
#[allow(clippy::needless_range_loop)]
for index in start_at..=ends_at {
let (func, major, minor, patch) = upgrade_list[index];
(func)(&self.db_path)?;
(func)(&self.db_path, current_major, current_minor, current_patch)?;
println!("Done");
// We're writing the version file just in case an issue arise _while_ upgrading.
// We don't want the DB to fail in an unknown state.

View File

@ -151,7 +151,12 @@ fn date_round_trip(
Ok(())
}
pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> {
pub fn v1_9_to_v1_10(
db_path: &Path,
_origin_major: &str,
_origin_minor: &str,
_origin_patch: &str,
) -> anyhow::Result<()> {
println!("Upgrading from v1.9.0 to v1.10.0");
// 2 changes here

View File

@ -14,7 +14,12 @@ use meilisearch_types::milli::index::db_name;
use crate::uuid_codec::UuidCodec;
use crate::{try_opening_database, try_opening_poly_database};
pub fn v1_10_to_v1_11(db_path: &Path) -> anyhow::Result<()> {
pub fn v1_10_to_v1_11(
db_path: &Path,
_origin_major: &str,
_origin_minor: &str,
_origin_patch: &str,
) -> anyhow::Result<()> {
println!("Upgrading from v1.10.0 to v1.11.0");
let index_scheduler_path = db_path.join("tasks");

View File

@ -1,17 +1,34 @@
//! The breaking changes that happened between the v1.11 and the v1.12 are:
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
use std::borrow::Cow;
use std::io::BufWriter;
use std::path::Path;
use std::sync::atomic::AtomicBool;
use anyhow::Context;
use file_store::FileStore;
use indexmap::IndexMap;
use meilisearch_types::milli::documents::DocumentsBatchReader;
use meilisearch_types::milli::heed::types::{SerdeJson, Str};
use meilisearch_types::milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
use meilisearch_types::milli::progress::Step;
use meilisearch_types::milli::{FieldDistribution, Index};
use serde::Serialize;
use serde_json::value::RawValue;
use tempfile::NamedTempFile;
use time::OffsetDateTime;
use uuid::Uuid;
pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
use crate::try_opening_database;
use crate::uuid_codec::UuidCodec;
pub fn v1_11_to_v1_12(
db_path: &Path,
_origin_major: &str,
_origin_minor: &str,
_origin_patch: &str,
) -> anyhow::Result<()> {
println!("Upgrading from v1.11.0 to v1.12.0");
convert_update_files(db_path)?;
@ -19,6 +36,23 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
Ok(())
}
pub fn v1_12_to_v1_12_3(
db_path: &Path,
origin_major: &str,
origin_minor: &str,
origin_patch: &str,
) -> anyhow::Result<()> {
println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3");
if origin_minor == "12" {
rebuild_field_distribution(db_path)?;
} else {
println!("Not rebuilding field distribution as it wasn't corrupted coming from v{origin_major}.{origin_minor}.{origin_patch}");
}
Ok(())
}
/// Convert the update files from OBKV to ndjson format.
///
/// 1) List all the update files using the file store.
@ -77,3 +111,188 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
Ok(())
}
/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3
fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let mut sched_wtxn = env.write_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
format!("While trying to open {:?}", index_scheduler_path.display())
})?;
let index_count =
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
// 1. immutably for the iteration
// 2. mutably for updating index stats
let indexes: Vec<_> = index_mapping
.iter(&sched_wtxn)?
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
.collect();
let progress = meilisearch_types::milli::progress::Progress::default();
let finished = AtomicBool::new(false);
std::thread::scope(|scope| {
let display_progress = std::thread::Builder::new()
.name("display_progress".into())
.spawn_scoped(scope, || {
while !finished.load(std::sync::atomic::Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_secs(5));
let view = progress.as_progress_view();
let Ok(view) = serde_json::to_string(&view) else {
continue;
};
println!("{view}");
}
})
.unwrap();
for (index_index, result) in indexes.into_iter().enumerate() {
let (uid, uuid) = result?;
progress.update_progress(VariableNameStep::new(
&uid,
index_index as u32,
index_count as u32,
));
let index_path = db_path.join("indexes").join(uuid.to_string());
println!(
"[{}/{index_count}]Updating index `{uid}` at `{}`",
index_index + 1,
index_path.display()
);
println!("\t- Rebuilding field distribution");
let index = meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?;
let mut index_txn = index.write_txn()?;
meilisearch_types::milli::update::new::reindex::field_distribution(
&index,
&mut index_txn,
&progress,
)
.context("while rebuilding field distribution")?;
let stats = IndexStats::new(&index, &index_txn)
.with_context(|| format!("computing stats for index `{uid}`"))?;
store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
index_txn.commit().context("while committing the write txn for the updated index")?;
}
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
finished.store(true, std::sync::atomic::Ordering::Relaxed);
if let Err(panic) = display_progress.join() {
let msg = match panic.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match panic.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
eprintln!("WARN: the display thread panicked with {msg}");
}
println!("Upgrading database succeeded");
Ok(())
})
}
pub struct VariableNameStep {
name: String,
current: u32,
total: u32,
}
impl VariableNameStep {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
Self { name: name.into(), current, total }
}
}
impl Step for VariableNameStep {
fn name(&self) -> Cow<'static, str> {
self.name.clone().into()
}
fn current(&self) -> u32 {
self.current
}
fn total(&self) -> u32 {
self.total
}
}
pub fn store_stats_of(
stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
index_uuid: Uuid,
sched_wtxn: &mut RwTxn,
index_uid: &str,
stats: &IndexStats,
) -> anyhow::Result<()> {
stats_db
.put(sched_wtxn, &index_uuid, stats)
.with_context(|| format!("storing stats for index `{index_uid}`"))?;
Ok(())
}
/// The statistics that can be computed from an `Index` object.
#[derive(Serialize, Debug)]
pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64,
/// Size taken up by the index' DB, in bytes.
///
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
/// are not returned to the disk after a deletion, this number is typically larger than
/// `used_database_size` that only includes the size of the used pages.
pub database_size: u64,
/// Size taken by the used pages of the index' DB, in bytes.
///
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
/// this value is typically smaller than `database_size`.
pub used_database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution,
/// Creation date of the index.
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
/// Date of the last update of the index.
#[serde(with = "time::serde::rfc3339")]
pub updated_at: OffsetDateTime,
}
impl IndexStats {
/// Compute the stats of an index
///
/// # Parameters
///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> meilisearch_types::milli::Result<Self> {
Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?,
database_size: index.on_disk_size()?,
used_database_size: index.used_size()?,
field_distribution: index.field_distribution(rtxn)?,
created_at: index.created_at(rtxn)?,
updated_at: index.updated_at(rtxn)?,
})
}
}

View File

@ -33,7 +33,7 @@ pub fn obkv_to_object(obkv: &KvReader<FieldId>, index: &DocumentsBatchIndex) ->
let field_name = index
.name(field_id)
.ok_or(FieldIdMapMissingEntry::FieldId { field_id, process: "obkv_to_object" })?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
Ok((field_name.to_string(), value))
})
.collect()
@ -84,7 +84,8 @@ impl DocumentsBatchIndex {
let key =
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
let value = serde_json::from_slice::<serde_json::Value>(v)
.map_err(crate::error::InternalError::SerdeJson)?;
.map_err(crate::error::InternalError::SerdeJson)
.unwrap();
map.insert(key, value);
}

View File

@ -92,7 +92,8 @@ impl<'a> PrimaryKey<'a> {
PrimaryKey::Flat { name: _, field_id } => match document.get(*field_id) {
Some(document_id_bytes) => {
let document_id = serde_json::from_slice(document_id_bytes)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
match validate_document_id_value(document_id) {
Ok(document_id) => Ok(Ok(document_id)),
Err(user_error) => {
@ -108,7 +109,8 @@ impl<'a> PrimaryKey<'a> {
if let Some(field_id) = fields.id(first_level_name) {
if let Some(value_bytes) = document.get(field_id) {
let object = serde_json::from_slice(value_bytes)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
fetch_matching_values(object, right, &mut matching_documents_ids);
if matching_documents_ids.len() >= 2 {
@ -151,11 +153,12 @@ impl<'a> PrimaryKey<'a> {
};
let document_id: &RawValue =
serde_json::from_slice(document_id).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(document_id).map_err(InternalError::SerdeJson).unwrap();
let document_id = document_id
.deserialize_any(crate::update::new::indexer::de::DocumentIdVisitor(indexer))
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let external_document_id = match document_id {
Ok(document_id) => Ok(document_id),
@ -173,7 +176,7 @@ impl<'a> PrimaryKey<'a> {
let Some(value) = document.get(fid) else { continue };
let value: &RawValue =
serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
match match_component(first_level, right, value, indexer, &mut docid) {
ControlFlow::Continue(()) => continue,
ControlFlow::Break(Ok(_)) => {
@ -183,7 +186,7 @@ impl<'a> PrimaryKey<'a> {
.into())
}
ControlFlow::Break(Err(err)) => {
return Err(InternalError::SerdeJson(err).into())
panic!("{err}");
}
}
}

View File

@ -228,7 +228,8 @@ pub fn obkv_to_json(
field_id: id,
process: "obkv_to_json",
})?;
let value = serde_json::from_slice(value).map_err(error::InternalError::SerdeJson)?;
let value =
serde_json::from_slice(value).map_err(error::InternalError::SerdeJson).unwrap();
Ok((name.to_owned(), value))
})
.collect()

View File

@ -219,12 +219,19 @@ impl<'a> FacetDistribution<'a> {
let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap();
let key: (FieldId, _, &str) = (field_id, any_docid, facet_key);
let original_string = self
.index
.field_id_docid_facet_strings
.get(self.rtxn, &key)?
.unwrap()
.to_owned();
let optional_original_string =
self.index.field_id_docid_facet_strings.get(self.rtxn, &key)?;
let original_string = match optional_original_string {
Some(original_string) => original_string.to_owned(),
None => {
tracing::error!(
"Missing original facet string. Using the normalized facet {} instead",
facet_key
);
facet_key.to_string()
}
};
distribution.insert(original_string, nbr_docids);
if distribution.len() == self.max_values_per_facet {

View File

@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use rayon::{ThreadPool, ThreadPoolBuilder};
@ -9,6 +9,8 @@ use thiserror::Error;
#[derive(Debug)]
pub struct ThreadPoolNoAbort {
thread_pool: ThreadPool,
/// The number of active operations.
active_operations: AtomicUsize,
/// Set to true if the thread pool catched a panic.
pool_catched_panic: Arc<AtomicBool>,
}
@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
OP: FnOnce() -> R + Send,
R: Send,
{
self.active_operations.fetch_add(1, Ordering::Relaxed);
let output = self.thread_pool.install(op);
self.active_operations.fetch_sub(1, Ordering::Relaxed);
// While reseting the pool panic catcher we return an error if we catched one.
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
Err(PanicCatched)
@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
pub fn current_num_threads(&self) -> usize {
self.thread_pool.current_num_threads()
}
/// The number of active operations.
pub fn active_operations(&self) -> usize {
self.active_operations.load(Ordering::Relaxed)
}
}
#[derive(Error, Debug)]
@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
let catched_panic = pool_catched_panic.clone();
move |_result| catched_panic.store(true, Ordering::SeqCst)
});
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
Ok(ThreadPoolNoAbort {
thread_pool: self.0.build()?,
active_operations: AtomicUsize::new(0),
pool_catched_panic,
})
}
}

View File

@ -79,22 +79,29 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::ops::Bound;
use grenad::Merger;
use heed::types::{Bytes, DecodeIgnore};
use heed::BytesDecode as _;
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use tracing::debug;
use self::incremental::FacetsUpdateIncremental;
use super::{FacetsUpdateBulk, MergeDeladdBtreesetString, MergeDeladdCboRoaringBitmaps};
use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec, OrderedF64Codec,
};
use crate::heed_codec::BytesRefCodec;
use crate::search::facet::get_highest_level;
use crate::update::del_add::{DelAdd, KvReaderDelAdd};
use crate::{try_split_array_at, FieldId, Index, Result};
pub mod bulk;
pub mod incremental;
pub mod new_incremental;
/// A builder used to add new elements to the `facet_id_string_docids` or `facet_id_f64_docids` databases.
///
@ -646,3 +653,194 @@ mod comparison_bench {
}
}
}
/// Run sanity checks on the specified fid tree
///
/// 1. No "orphan" child value, any child value has a parent
/// 2. Any docid in the child appears in the parent
/// 3. No docid in the parent is missing from all its children
/// 4. no group is bigger than max_group_size
/// 5. Less than 50% of groups are bigger than group_size
/// 6. group size matches the number of children
/// 7. max_level is < 255
pub(crate) fn sanity_checks(
index: &Index,
rtxn: &heed::RoTxn,
field_id: FieldId,
facet_type: FacetType,
group_size: usize,
_min_level_size: usize, // might add a check on level size later
max_group_size: usize,
) -> Result<()> {
tracing::info!(%field_id, ?facet_type, "performing sanity checks");
let database = match facet_type {
FacetType::String => {
index.facet_id_string_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
}
FacetType::Number => {
index.facet_id_f64_docids.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
}
};
let leaf_prefix: FacetGroupKey<&[u8]> = FacetGroupKey { field_id, level: 0, left_bound: &[] };
let leaf_it = database.prefix_iter(rtxn, &leaf_prefix)?;
let max_level = get_highest_level(rtxn, database, field_id)?;
if max_level == u8::MAX {
panic!("max_level == 255");
}
for leaf in leaf_it {
let (leaf_facet_value, leaf_docids) = leaf?;
let mut current_level = 0;
let mut current_parent_facet_value: Option<FacetGroupKey<&[u8]>> = None;
let mut current_parent_docids: Option<crate::heed_codec::facet::FacetGroupValue> = None;
loop {
current_level += 1;
if current_level >= max_level {
break;
}
let parent_key_right_bound = FacetGroupKey {
field_id,
level: current_level,
left_bound: leaf_facet_value.left_bound,
};
let (parent_facet_value, parent_docids) = database
.get_lower_than_or_equal_to(rtxn, &parent_key_right_bound)?
.expect("no parent found");
if parent_facet_value.level != current_level {
panic!(
"wrong parent level, found_level={}, expected_level={}",
parent_facet_value.level, current_level
);
}
if parent_facet_value.field_id != field_id {
panic!("wrong parent fid");
}
if parent_facet_value.left_bound > leaf_facet_value.left_bound {
panic!("wrong parent left bound");
}
if !leaf_docids.bitmap.is_subset(&parent_docids.bitmap) {
panic!(
"missing docids from leaf in parent, current_level={}, parent={}, child={}, missing={missing:?}, child_len={}, child={:?}",
current_level,
facet_to_string(parent_facet_value.left_bound, facet_type),
facet_to_string(leaf_facet_value.left_bound, facet_type),
leaf_docids.bitmap.len(),
leaf_docids.bitmap.clone(),
missing=leaf_docids.bitmap - parent_docids.bitmap,
)
}
if let Some(current_parent_facet_value) = current_parent_facet_value {
if current_parent_facet_value.field_id != parent_facet_value.field_id {
panic!("wrong parent parent fid");
}
if current_parent_facet_value.level + 1 != parent_facet_value.level {
panic!("wrong parent parent level");
}
if current_parent_facet_value.left_bound < parent_facet_value.left_bound {
panic!("wrong parent parent left bound");
}
}
if let Some(current_parent_docids) = current_parent_docids {
if !current_parent_docids.bitmap.is_subset(&parent_docids.bitmap) {
panic!("missing docids from intermediate node in parent, parent_level={}, parent={}, intermediate={}, missing={missing:?}, intermediate={:?}",
parent_facet_value.level,
facet_to_string(parent_facet_value.left_bound, facet_type),
facet_to_string(current_parent_facet_value.unwrap().left_bound, facet_type),
current_parent_docids.bitmap.clone(),
missing=current_parent_docids.bitmap - parent_docids.bitmap,
);
}
}
current_parent_facet_value = Some(parent_facet_value);
current_parent_docids = Some(parent_docids);
}
}
tracing::info!(%field_id, ?facet_type, "checked all leaves");
let mut current_level = max_level;
let mut greater_than_group = 0usize;
let mut total = 0usize;
loop {
if current_level == 0 {
break;
}
let child_level = current_level - 1;
tracing::info!(%field_id, ?facet_type, %current_level, "checked groups for level");
let level_groups_prefix: FacetGroupKey<&[u8]> =
FacetGroupKey { field_id, level: current_level, left_bound: &[] };
let mut level_groups_it = database.prefix_iter(rtxn, &level_groups_prefix)?.peekable();
'group_it: loop {
let Some(group) = level_groups_it.next() else { break 'group_it };
let (group_facet_value, group_docids) = group?;
let child_left_bound = group_facet_value.left_bound.to_owned();
let mut expected_docids = RoaringBitmap::new();
let mut expected_size = 0usize;
let right_bound = level_groups_it
.peek()
.and_then(|res| res.as_ref().ok())
.map(|(key, _)| key.left_bound);
let child_left_bound = FacetGroupKey {
field_id,
level: child_level,
left_bound: child_left_bound.as_slice(),
};
let child_left_bound = Bound::Included(&child_left_bound);
let child_right_bound;
let child_right_bound = if let Some(right_bound) = right_bound {
child_right_bound =
FacetGroupKey { field_id, level: child_level, left_bound: right_bound };
Bound::Excluded(&child_right_bound)
} else {
Bound::Unbounded
};
let children = database.range(rtxn, &(child_left_bound, child_right_bound))?;
for child in children {
let (child_facet_value, child_docids) = child?;
if child_facet_value.field_id != field_id {
break;
}
if child_facet_value.level != child_level {
break;
}
expected_size += 1;
expected_docids |= &child_docids.bitmap;
}
assert_eq!(expected_size, group_docids.size as usize);
assert!(expected_size <= max_group_size);
assert_eq!(expected_docids, group_docids.bitmap);
total += 1;
if expected_size > group_size {
greater_than_group += 1;
}
}
current_level -= 1;
}
if greater_than_group * 2 > total {
panic!("too many groups have a size > group_size");
}
tracing::info!("sanity checks OK");
Ok(())
}
fn facet_to_string(facet_value: &[u8], facet_type: FacetType) -> String {
match facet_type {
FacetType::String => bstr::BStr::new(facet_value).to_string(),
FacetType::Number => match OrderedF64Codec::bytes_decode(facet_value) {
Ok(value) => value.to_string(),
Err(e) => format!("error: {e} (bytes: {facet_value:?}"),
},
}
}

View File

@ -0,0 +1,498 @@
use std::ops::Bound;
use heed::types::{Bytes, DecodeIgnore};
use heed::{BytesDecode as _, Database, RwTxn};
use roaring::RoaringBitmap;
use crate::facet::FacetType;
use crate::heed_codec::facet::{
FacetGroupKey, FacetGroupKeyCodec, FacetGroupValue, FacetGroupValueCodec,
};
use crate::heed_codec::BytesRefCodec;
use crate::search::facet::get_highest_level;
use crate::update::valid_facet_value;
use crate::{FieldId, Index, Result};
pub struct FacetsUpdateIncremental {
inner: FacetsUpdateIncrementalInner,
delta_data: Vec<FacetFieldIdChange>,
}
struct FacetsUpdateIncrementalInner {
db: Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
field_id: FieldId,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
}
impl FacetsUpdateIncremental {
pub fn new(
index: &Index,
facet_type: FacetType,
field_id: FieldId,
delta_data: Vec<FacetFieldIdChange>,
group_size: u8,
min_level_size: u8,
max_group_size: u8,
) -> Self {
FacetsUpdateIncremental {
inner: FacetsUpdateIncrementalInner {
db: match facet_type {
FacetType::String => index
.facet_id_string_docids
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
FacetType::Number => index
.facet_id_f64_docids
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>(),
},
field_id,
group_size,
min_level_size,
max_group_size,
},
delta_data,
}
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing::facets::incremental")]
pub fn execute(mut self, wtxn: &mut RwTxn) -> Result<()> {
if self.delta_data.is_empty() {
return Ok(());
}
self.delta_data.sort_unstable_by(
|FacetFieldIdChange { facet_value: left, .. },
FacetFieldIdChange { facet_value: right, .. }| {
left.cmp(right)
// sort in **reverse** lexicographic order
.reverse()
},
);
self.inner.find_changed_parents(wtxn, self.delta_data)?;
self.inner.add_or_delete_level(wtxn)
}
}
impl FacetsUpdateIncrementalInner {
/// WARNING: `changed_children` must be sorted in **reverse** lexicographic order.
fn find_changed_parents(
&self,
wtxn: &mut RwTxn,
mut changed_children: Vec<FacetFieldIdChange>,
) -> Result<()> {
let mut changed_parents = vec![];
for child_level in 0u8..u8::MAX {
// child_level < u8::MAX by construction
let parent_level = child_level + 1;
let parent_level_left_bound: FacetGroupKey<&[u8]> =
FacetGroupKey { field_id: self.field_id, level: parent_level, left_bound: &[] };
let mut last_parent: Option<Box<[u8]>> = None;
let mut child_it = changed_children
// drain all changed children
.drain(..)
// keep only children whose value is valid in the LMDB sense
.filter(|child| valid_facet_value(&child.facet_value));
// `while let` rather than `for` because we advance `child_it` inside of the loop
'current_level: while let Some(child) = child_it.next() {
if let Some(last_parent) = &last_parent {
if &child.facet_value >= last_parent {
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
continue 'current_level;
}
}
// need to find a new parent
let parent_key_prefix = FacetGroupKey {
field_id: self.field_id,
level: parent_level,
left_bound: &*child.facet_value,
};
let parent = self
.db
.remap_data_type::<DecodeIgnore>()
.rev_range(
wtxn,
&(
Bound::Excluded(&parent_level_left_bound),
Bound::Included(&parent_key_prefix),
),
)?
.next();
match parent {
Some(Ok((parent_key, _parent_value))) => {
// found parent, cache it for next keys
last_parent = Some(parent_key.left_bound.to_owned().into_boxed_slice());
// add to modified list for parent level
changed_parents.push(FacetFieldIdChange {
facet_value: parent_key.left_bound.to_owned().into_boxed_slice(),
});
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
}
Some(Err(err)) => return Err(err.into()),
None => {
// no parent for that key
let mut parent_it = self
.db
.remap_data_type::<DecodeIgnore>()
.prefix_iter_mut(wtxn, &parent_level_left_bound)?;
match parent_it.next() {
// 1. left of the current left bound, or
Some(Ok((first_key, _first_value))) => {
// make sure we don't spill on the neighboring fid (level also included defensively)
if first_key.field_id != self.field_id
|| first_key.level != parent_level
{
// max level reached, exit
drop(parent_it);
self.compute_parent_group(
wtxn,
child_level,
child.facet_value,
)?;
for child in child_it.by_ref() {
self.compute_parent_group(
wtxn,
child_level,
child.facet_value,
)?;
}
return Ok(());
}
// remove old left bound
unsafe { parent_it.del_current()? };
drop(parent_it);
changed_parents.push(FacetFieldIdChange {
facet_value: child.facet_value.clone(),
});
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
// pop all elements in order to visit the new left bound
let new_left_bound =
&mut changed_parents.last_mut().unwrap().facet_value;
for child in child_it.by_ref() {
new_left_bound.clone_from(&child.facet_value);
self.compute_parent_group(
wtxn,
child_level,
child.facet_value,
)?;
}
}
Some(Err(err)) => return Err(err.into()),
// 2. max level reached, exit
None => {
drop(parent_it);
self.compute_parent_group(wtxn, child_level, child.facet_value)?;
for child in child_it.by_ref() {
self.compute_parent_group(
wtxn,
child_level,
child.facet_value,
)?;
}
return Ok(());
}
}
}
}
}
if changed_parents.is_empty() {
return Ok(());
}
drop(child_it);
std::mem::swap(&mut changed_children, &mut changed_parents);
// changed_parents is now empty because changed_children was emptied by the drain
}
Ok(())
}
fn compute_parent_group(
&self,
wtxn: &mut RwTxn<'_>,
parent_level: u8,
parent_left_bound: Box<[u8]>,
) -> Result<()> {
let mut range_left_bound: Vec<u8> = parent_left_bound.into();
if parent_level == 0 {
return Ok(());
}
let child_level = parent_level - 1;
let parent_key = FacetGroupKey {
field_id: self.field_id,
level: parent_level,
left_bound: &*range_left_bound,
};
let child_right_bound = self
.db
.remap_data_type::<DecodeIgnore>()
.get_greater_than(wtxn, &parent_key)?
.and_then(
|(
FacetGroupKey {
level: right_level,
field_id: right_fid,
left_bound: right_bound,
},
_,
)| {
if parent_level != right_level || self.field_id != right_fid {
// there was a greater key, but with a greater level or fid, so not a sibling to the parent: ignore
return None;
}
Some(right_bound.to_owned())
},
);
let child_right_bound = match &child_right_bound {
Some(right_bound) => Bound::Excluded(FacetGroupKey {
left_bound: right_bound.as_slice(),
field_id: self.field_id,
level: child_level,
}),
None => Bound::Unbounded,
};
let child_left_key = FacetGroupKey {
field_id: self.field_id,
level: child_level,
left_bound: &*range_left_bound,
};
let mut child_left_bound = Bound::Included(child_left_key);
loop {
// do a first pass on the range to find the number of children
let child_count = self
.db
.remap_data_type::<DecodeIgnore>()
.range(wtxn, &(child_left_bound, child_right_bound))?
.take(self.max_group_size as usize * 2)
.count();
let mut child_it = self.db.range(wtxn, &(child_left_bound, child_right_bound))?;
// pick the right group_size depending on the number of children
let group_size = if child_count >= self.max_group_size as usize * 2 {
// more than twice the max_group_size => there will be space for at least 2 groups of max_group_size
self.max_group_size as usize
} else if child_count >= self.group_size as usize {
// size in [group_size, max_group_size * 2[
// divided by 2 it is between [group_size / 2, max_group_size[
// this ensures that the tree is balanced
child_count / 2
} else {
// take everything
child_count
};
let res: Result<_> = child_it
.by_ref()
.take(group_size)
// stop if we go to the next level or field id
.take_while(|res| match res {
Ok((child_key, _)) => {
child_key.field_id == self.field_id && child_key.level == child_level
}
Err(_) => true,
})
.try_fold(
(None, FacetGroupValue { size: 0, bitmap: Default::default() }),
|(bounds, mut group_value), child_res| {
let (child_key, child_value) = child_res?;
let bounds = match bounds {
Some((left_bound, _)) => Some((left_bound, child_key.left_bound)),
None => Some((child_key.left_bound, child_key.left_bound)),
};
// max_group_size <= u8::MAX
group_value.size += 1;
group_value.bitmap |= &child_value.bitmap;
Ok((bounds, group_value))
},
);
let (bounds, group_value) = res?;
let Some((group_left_bound, right_bound)) = bounds else {
let update_key = FacetGroupKey {
field_id: self.field_id,
level: parent_level,
left_bound: &*range_left_bound,
};
drop(child_it);
if let Bound::Included(_) = child_left_bound {
self.db.delete(wtxn, &update_key)?;
}
break;
};
drop(child_it);
let current_left_bound = group_left_bound.to_owned();
let delete_old_bound = match child_left_bound {
Bound::Included(bound) => {
if bound.left_bound != current_left_bound {
Some(range_left_bound.clone())
} else {
None
}
}
_ => None,
};
range_left_bound.clear();
range_left_bound.extend_from_slice(right_bound);
let child_left_key = FacetGroupKey {
field_id: self.field_id,
level: child_level,
left_bound: range_left_bound.as_slice(),
};
child_left_bound = Bound::Excluded(child_left_key);
if let Some(old_bound) = delete_old_bound {
let update_key = FacetGroupKey {
field_id: self.field_id,
level: parent_level,
left_bound: old_bound.as_slice(),
};
self.db.delete(wtxn, &update_key)?;
}
let update_key = FacetGroupKey {
field_id: self.field_id,
level: parent_level,
left_bound: current_left_bound.as_slice(),
};
if group_value.bitmap.is_empty() {
self.db.delete(wtxn, &update_key)?;
} else {
self.db.put(wtxn, &update_key, &group_value)?;
}
}
Ok(())
}
/// Check whether the highest level has exceeded `min_level_size` * `self.group_size`.
/// If it has, we must build an addition level above it.
/// Then check whether the highest level is under `min_level_size`.
/// If it has, we must remove the complete level.
pub(crate) fn add_or_delete_level(&self, txn: &mut RwTxn<'_>) -> Result<()> {
let highest_level = get_highest_level(txn, self.db, self.field_id)?;
let mut highest_level_prefix = vec![];
highest_level_prefix.extend_from_slice(&self.field_id.to_be_bytes());
highest_level_prefix.push(highest_level);
let size_highest_level =
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, &highest_level_prefix)?.count();
if size_highest_level >= self.group_size as usize * self.min_level_size as usize {
self.add_level(txn, highest_level, &highest_level_prefix, size_highest_level)
} else if size_highest_level < self.min_level_size as usize && highest_level != 0 {
self.delete_level(txn, &highest_level_prefix)
} else {
Ok(())
}
}
/// Delete a level.
fn delete_level(&self, txn: &mut RwTxn<'_>, highest_level_prefix: &[u8]) -> Result<()> {
let mut to_delete = vec![];
let mut iter =
self.db.remap_types::<Bytes, Bytes>().prefix_iter(txn, highest_level_prefix)?;
for el in iter.by_ref() {
let (k, _) = el?;
to_delete.push(
FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(k)
.map_err(heed::Error::Encoding)?
.into_owned(),
);
}
drop(iter);
for k in to_delete {
self.db.delete(txn, &k.as_ref())?;
}
Ok(())
}
/// Build an additional level for the field id.
fn add_level(
&self,
txn: &mut RwTxn<'_>,
highest_level: u8,
highest_level_prefix: &[u8],
size_highest_level: usize,
) -> Result<()> {
let mut groups_iter = self
.db
.remap_types::<Bytes, FacetGroupValueCodec>()
.prefix_iter(txn, highest_level_prefix)?;
let nbr_new_groups = size_highest_level / self.group_size as usize;
let nbr_leftover_elements = size_highest_level % self.group_size as usize;
let mut to_add = vec![];
for _ in 0..nbr_new_groups {
let mut first_key = None;
let mut values = RoaringBitmap::new();
for _ in 0..self.group_size {
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
.map_err(heed::Error::Encoding)?;
if first_key.is_none() {
first_key = Some(key_i);
}
values |= value_i.bitmap;
}
let key = FacetGroupKey {
field_id: self.field_id,
level: highest_level + 1,
left_bound: first_key.unwrap().left_bound,
};
let value = FacetGroupValue { size: self.group_size, bitmap: values };
to_add.push((key.into_owned(), value));
}
// now we add the rest of the level, in case its size is > group_size * min_level_size
// this can indeed happen if the min_level_size parameter changes between two calls to `insert`
if nbr_leftover_elements > 0 {
let mut first_key = None;
let mut values = RoaringBitmap::new();
for _ in 0..nbr_leftover_elements {
let (key_bytes, value_i) = groups_iter.next().unwrap()?;
let key_i = FacetGroupKeyCodec::<BytesRefCodec>::bytes_decode(key_bytes)
.map_err(heed::Error::Encoding)?;
if first_key.is_none() {
first_key = Some(key_i);
}
values |= value_i.bitmap;
}
let key = FacetGroupKey {
field_id: self.field_id,
level: highest_level + 1,
left_bound: first_key.unwrap().left_bound,
};
// Note: nbr_leftover_elements can be casted to a u8 since it is bounded by `max_group_size`
// when it is created above.
let value = FacetGroupValue { size: nbr_leftover_elements as u8, bitmap: values };
to_add.push((key.into_owned(), value));
}
drop(groups_iter);
for (key, value) in to_add {
self.db.put(txn, &key.as_ref(), &value)?;
}
Ok(())
}
}
#[derive(Debug)]
pub struct FacetFieldIdChange {
pub facet_value: Box<[u8]>,
}

View File

@ -123,7 +123,8 @@ pub fn enrich_documents_batch<R: Read + Seek>(
}
}
let document_id = serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson)?;
let document_id =
serde_json::to_vec(&document_id).map_err(InternalError::SerdeJson).unwrap();
external_ids.insert(count.to_be_bytes(), document_id)?;
count += 1;
@ -237,7 +238,7 @@ pub fn validate_geo_from_json(id: &DocumentId, bytes: &[u8]) -> Result<StdResult
let debug_id = || {
serde_json::from_slice(id.value().as_bytes()).unwrap_or_else(|_| Value::from(id.debug()))
};
match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson)? {
match serde_json::from_slice(bytes).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(mut object) => match (object.remove("lat"), object.remove("lng")) {
(Some(lat), Some(lng)) => {
match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) {

View File

@ -206,7 +206,7 @@ fn tokens_from_document<'a>(
if let Some(field_bytes) = KvReaderDelAdd::from_slice(field_bytes).get(del_add) {
// parse json.
let value =
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson).unwrap();
// prepare writing destination.
buffers.obkv_positions_buffer.clear();

View File

@ -10,10 +10,14 @@ use fst::{IntoStreamer, Streamer};
pub use grenad_helpers::*;
pub use merge_functions::*;
use crate::MAX_WORD_LENGTH;
use crate::MAX_LMDB_KEY_LENGTH;
pub fn valid_lmdb_key(key: impl AsRef<[u8]>) -> bool {
key.as_ref().len() <= MAX_WORD_LENGTH * 2 && !key.as_ref().is_empty()
key.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !key.as_ref().is_empty()
}
pub fn valid_facet_value(facet_value: impl AsRef<[u8]>) -> bool {
facet_value.as_ref().len() <= MAX_LMDB_KEY_LENGTH - 3 && !facet_value.as_ref().is_empty()
}
/// Divides one slice into two at an index, returns `None` if mid is out of bounds.

View File

@ -3334,6 +3334,44 @@ mod tests {
rtxn.commit().unwrap();
}
#[test]
fn incremental_update_without_changing_facet_distribution() {
let index = TempIndex::new();
index
.add_documents(documents!([
{"id": 0, "some_field": "aaa", "other_field": "aaa" },
{"id": 1, "some_field": "bbb", "other_field": "bbb" },
]))
.unwrap();
{
let rtxn = index.read_txn().unwrap();
// count field distribution
let results = index.field_distribution(&rtxn).unwrap();
assert_eq!(Some(&2), results.get("id"));
assert_eq!(Some(&2), results.get("some_field"));
assert_eq!(Some(&2), results.get("other_field"));
}
let mut index = index;
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
index
.add_documents(documents!([
{"id": 0, "other_field": "bbb" },
{"id": 1, "some_field": "ccc" },
]))
.unwrap();
{
let rtxn = index.read_txn().unwrap();
// count field distribution
let results = index.field_distribution(&rtxn).unwrap();
assert_eq!(Some(&2), results.get("id"));
assert_eq!(Some(&2), results.get("some_field"));
assert_eq!(Some(&2), results.get("other_field"));
}
}
#[test]
fn delete_words_exact_attributes() {
let index = TempIndex::new();

View File

@ -1,5 +1,5 @@
---
source: milli/src/update/index_documents/mod.rs
source: crates/milli/src/update/index_documents/mod.rs
---
3 0 48.9021 1 [19, ]
3 0 49.9314 1 [17, ]
@ -15,6 +15,11 @@ source: milli/src/update/index_documents/mod.rs
3 0 50.7453 1 [7, ]
3 0 50.8466 1 [10, ]
3 0 51.0537 1 [9, ]
3 1 48.9021 2 [17, 19, ]
3 1 50.1793 3 [13, 14, 15, ]
3 1 50.4502 4 [0, 3, 8, 12, ]
3 1 50.6312 2 [1, 2, ]
3 1 50.7453 3 [7, 9, 10, ]
4 0 2.271 1 [17, ]
4 0 2.3708 1 [19, ]
4 0 2.7637 1 [14, ]
@ -28,4 +33,3 @@ source: milli/src/update/index_documents/mod.rs
4 0 3.6957 1 [9, ]
4 0 3.9623 1 [12, ]
4 0 4.337 1 [10, ]

View File

@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding;
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
/// Note that the FrameProducer requires up to 9 bytes to
/// encode the length, the max grant has been computed accordingly.
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
const MAX_FRAME_HEADER_SIZE: usize = 9;
/// Creates a tuple of senders/receiver to be used by
/// the extractors and the writer loop.
///
@ -53,8 +59,9 @@ pub fn extractor_writer_bbqueue(
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
let capacity = bbbuffers.first().unwrap().capacity();
// Read the field description to understand this
let capacity = capacity.checked_sub(9).unwrap();
// 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message.
// 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here.
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
let producers = ThreadLocal::with_capacity(bbbuffers.len());
let consumers = rayon::broadcast(|bi| {
@ -65,7 +72,7 @@ pub fn extractor_writer_bbqueue(
});
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity };
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
@ -81,13 +88,10 @@ pub struct ExtractorBbqueueSender<'a> {
/// A memory buffer, one by thread, is used to serialize
/// the entries directly in this shared, lock-free space.
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
/// The capacity of this frame producer, will never be able to store more than that.
///
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
/// the capacity has been shrunk accordingly.
///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
capacity: usize,
/// The maximum frame grant that a producer can reserve.
/// It will never be able to store more than that as the
/// buffer cannot split data into two parts.
max_grant: usize,
}
pub struct WriterBbqueueReceiver<'a> {
@ -443,14 +447,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
let total_length = EntryHeader::total_delete_vector_size();
if total_length > capacity {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
}
// Spin loop to have a frame the size we requested.
@ -468,7 +472,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
embedder_id: u8,
embeddings: &[Vec<f32>],
) -> crate::Result<()> {
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
@ -479,7 +483,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
if total_length > capacity {
if total_length > max_grant {
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
for embedding in embeddings {
let mut embedding_bytes = bytemuck::cast_slice(embedding);
@ -540,14 +544,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
where
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
{
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let operation = DbOperation { database, key_length: Some(key_length) };
let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
if total_length > capacity {
if total_length > max_grant {
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
let value_file = tempfile::tempfile()?;
value_file.set_len(value_length.try_into().unwrap())?;
@ -601,7 +605,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
let capacity = self.capacity;
let max_grant = self.max_grant;
let refcell = self.producers.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
@ -610,8 +614,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
let operation = DbOperation { database, key_length: None };
let payload_header = EntryHeader::DbOperation(operation);
let total_length = EntryHeader::total_key_size(key_length);
if total_length > capacity {
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
if total_length > max_grant {
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
}
// Spin loop to have a frame the size we requested.

View File

@ -86,7 +86,7 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> {
let res = (|| {
let value =
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson)?;
serde_json::from_slice(value).map_err(crate::InternalError::SerdeJson).unwrap();
Ok((name, value))
})();
@ -139,7 +139,7 @@ impl<'t, Mapper: FieldIdMapper> DocumentFromDb<'t, Mapper> {
return Ok(None);
};
let Some(value) = self.content.get(fid) else { return Ok(None) };
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson)?))
Ok(Some(serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap()))
}
}

View File

@ -89,7 +89,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
.or_default();
*entry -= 1;
}
let content = update.updated();
let content =
update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let geo_iter =
content.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in content.iter_top_level_fields().chain(geo_iter) {

View File

@ -283,42 +283,60 @@ impl FacetedDocidsExtractor {
}
struct DelAddFacetValue<'doc> {
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
strings: HashMap<
(FieldId, &'doc str),
Option<BVec<'doc, u8>>,
hashbrown::DefaultHashBuilder,
&'doc Bump,
>,
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
doc_alloc: &'doc Bump,
}
impl<'doc> DelAddFacetValue<'doc> {
fn new(doc_alloc: &'doc Bump) -> Self {
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) }
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc), doc_alloc }
}
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Deletion) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Addition);
match kind {
FacetKind::Number => {
let key = (fid, value);
if let Some(DelAdd::Deletion) = self.f64s.get(&key) {
self.f64s.remove(&key);
} else {
self.f64s.insert(key, DelAdd::Addition);
}
}
FacetKind::String => {
if let Ok(s) = std::str::from_utf8(&value) {
let normalized = crate::normalize_facet(s);
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
self.strings.insert((fid, truncated), Some(value));
}
}
_ => (),
}
}
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
let cache = match kind {
FacetKind::String => &mut self.strings,
FacetKind::Number => &mut self.f64s,
_ => return,
};
let key = (fid, value);
if let Some(DelAdd::Addition) = cache.get(&key) {
cache.remove(&key);
} else {
cache.insert(key, DelAdd::Deletion);
match kind {
FacetKind::Number => {
let key = (fid, value);
if let Some(DelAdd::Addition) = self.f64s.get(&key) {
self.f64s.remove(&key);
} else {
self.f64s.insert(key, DelAdd::Deletion);
}
}
FacetKind::String => {
if let Ok(s) = std::str::from_utf8(&value) {
let normalized = crate::normalize_facet(s);
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
self.strings.insert((fid, truncated), None);
}
}
_ => (),
}
}
@ -329,18 +347,14 @@ impl<'doc> DelAddFacetValue<'doc> {
doc_alloc: &Bump,
) -> crate::Result<()> {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
for ((fid, value), deladd) in self.strings {
if let Ok(s) = std::str::from_utf8(&value) {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
let normalized = crate::normalize_facet(s);
let truncated = truncate_str(&normalized);
buffer.extend_from_slice(truncated.as_bytes());
match deladd {
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
}
for ((fid, truncated), value) in self.strings {
buffer.clear();
buffer.extend_from_slice(&fid.to_be_bytes());
buffer.extend_from_slice(&docid.to_be_bytes());
buffer.extend_from_slice(truncated.as_bytes());
match &value {
Some(value) => sender.write_facet_string(&buffer, value)?,
None => sender.delete_facet_string(&buffer)?,
}
}

View File

@ -27,7 +27,7 @@ pub fn extract_document_facets<'doc>(
let selection = perm_json_p::select_field(field_name, Some(attributes_to_extract), &[]);
if selection != perm_json_p::Selection::Skip {
// parse json.
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson)? {
match serde_json::value::to_value(value).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(object) => {
perm_json_p::seek_leaf_values_in_object(
&object,

View File

@ -256,15 +256,16 @@ pub fn extract_geo_coordinates(
external_id: &str,
raw_value: &RawValue,
) -> Result<Option<[f64; 2]>> {
let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let mut geo =
match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson).unwrap() {
Value::Null => return Ok(None),
Value::Object(map) => map,
value => {
return Err(
GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
)
}
};
let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
(Some(lat), Some(lng)) => {

View File

@ -94,7 +94,7 @@ impl<'a> DocumentTokenizer<'a> {
};
// parse json.
match serde_json::to_value(value).map_err(InternalError::SerdeJson)? {
match serde_json::to_value(value).map_err(InternalError::SerdeJson).unwrap() {
Value::Object(object) => seek_leaf_values_in_object(
&object,
None,

View File

@ -158,7 +158,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
let mut previous_offset = 0;
let mut iter = Deserializer::from_slice(payload).into_iter::<&RawValue>();
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson)? {
while let Some(doc) = iter.next().transpose().map_err(InternalError::SerdeJson).unwrap() {
*bytes = previous_offset as u64;
// Only guess the primary key if it is the first document
@ -252,6 +252,24 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
previous_offset = iter.byte_offset();
}
if payload.is_empty() {
let result = retrieve_or_guess_primary_key(
rtxn,
index,
new_fields_ids_map,
primary_key_from_op,
None,
);
match result {
Ok(Ok((pk, _))) => {
primary_key.get_or_insert(pk);
}
Ok(Err(UserError::NoPrimaryKeyCandidateFound)) => (),
Ok(Err(user_error)) => return Err(Error::UserError(user_error)),
Err(error) => return Err(error),
};
}
Ok(new_docids_version_offsets)
}

View File

@ -4,6 +4,7 @@ use std::sync::{OnceLock, RwLock};
use std::thread::{self, Builder};
use big_s::S;
use bstr::ByteSlice as _;
use bumparaw_collections::RawMap;
use document_changes::{extract, DocumentChanges, IndexingContext};
pub use document_deletion::DocumentDeletion;
@ -36,6 +37,8 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::progress::Progress;
use crate::proximity::ProximityPrecision;
use crate::update::del_add::DelAdd;
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
@ -90,24 +93,32 @@ where
..grenad_parameters
};
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
// 2% of the indexing memory
let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
),
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
let (extractor_sender, mut writer_receiver) = pool
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
@ -203,6 +214,7 @@ where
caches,
FacetDatabases::new(index),
index,
&rtxn,
extractor_sender.facet_docids(),
)?;
}
@ -421,6 +433,7 @@ where
let mut arroy_writers = arroy_writers?;
{
let mut writer_receiver = writer_receiver;
let span = tracing::trace_span!(target: "indexing::write_db", "all");
let _entered = span.enter();
@ -580,7 +593,12 @@ fn write_from_bbqueue(
}
(key, None) => match database.delete(wtxn, key) {
Ok(false) => {
unreachable!("We tried to delete an unknown key: {key:?}")
tracing::error!(
database_name,
key_bytes = ?key,
formatted_key = ?key.as_bstr(),
"Attempt to delete an unknown key"
);
}
Ok(_) => (),
Err(error) => {
@ -735,27 +753,66 @@ fn compute_facet_search_database(
fn compute_facet_level_database(
index: &Index,
wtxn: &mut RwTxn,
facet_field_ids_delta: FacetFieldIdsDelta,
mut facet_field_ids_delta: FacetFieldIdsDelta,
) -> Result<()> {
if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() {
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
index,
modified_facet_string_ids,
FacetType::String,
)
.execute(wtxn)?;
match delta {
super::merger::FacetFieldIdDelta::Bulk => {
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
}
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::String,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
}
}
if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() {
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
FacetsUpdateBulk::new_not_updating_level_0(
match delta {
super::merger::FacetFieldIdDelta::Bulk => {
tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?
}
super::merger::FacetFieldIdDelta::Incremental(delta_data) => {
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index,
FacetType::Number,
fid,
delta_data,
FACET_GROUP_SIZE,
FACET_MIN_LEVEL_SIZE,
FACET_MAX_GROUP_SIZE,
)
.execute(wtxn)?
}
}
debug_assert!(crate::update::facet::sanity_checks(
index,
modified_facet_number_ids,
wtxn,
fid,
FacetType::Number,
FACET_GROUP_SIZE as usize,
FACET_MIN_LEVEL_SIZE as usize,
FACET_MAX_GROUP_SIZE as usize,
)
.execute(wtxn)?;
.is_ok());
}
Ok(())

View File

@ -78,7 +78,8 @@ where
let external_document_id = external_document_id.to_de();
let document = RawMap::from_raw_value_and_hasher(document, FxBuildHasher, doc_alloc)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let insertion = Insertion::create(docid, external_document_id, Versions::single(document));
Ok(Some(DocumentChange::Insertion(insertion)))

View File

@ -58,9 +58,9 @@ impl UpdateByFunction {
let ast = engine.compile(code).map_err(UserError::DocumentEditionCompilationError)?;
let context = match context {
Some(context) => {
Some(serde_json::from_value(context.into()).map_err(InternalError::SerdeJson)?)
}
Some(context) => Some(
serde_json::from_value(context.into()).map_err(InternalError::SerdeJson).unwrap(),
),
None => None,
};
@ -137,9 +137,11 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
Some(new_rhai_document) => {
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
serde_json::to_writer(&mut buffer, &new_rhai_document)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
let raw_new_doc = serde_json::from_slice(buffer.into_bump_slice())
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
// Note: This condition is not perfect. Sometimes it detect changes
// like with floating points numbers and consider updating
@ -166,7 +168,8 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
FxBuildHasher,
doc_alloc,
)
.map_err(InternalError::SerdeJson)?;
.map_err(InternalError::SerdeJson)
.unwrap();
Ok(Some(DocumentChange::Update(Update::create(
docid,
@ -200,7 +203,7 @@ fn obkv_to_rhaimap(obkv: &KvReaderFieldId, fields_ids_map: &FieldsIdsMap) -> Res
field_id: id,
process: "all_obkv_to_rhaimap",
})?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson)?;
let value = serde_json::from_slice(value).map_err(InternalError::SerdeJson).unwrap();
Ok((name.into(), value))
})
.collect();

View File

@ -1,6 +1,6 @@
use std::cell::RefCell;
use hashbrown::HashSet;
use hashbrown::HashMap;
use heed::types::Bytes;
use heed::{Database, RoTxn};
use memmap2::Mmap;
@ -12,6 +12,7 @@ use super::extract::{
merge_caches_sorted, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap,
FacetKind, GeoExtractorData,
};
use crate::update::facet::new_incremental::FacetFieldIdChange;
use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
@ -100,12 +101,18 @@ pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases,
index: &Index,
rtxn: &RoTxn,
docids_sender: FacetDocidsSender,
) -> Result<FacetFieldIdsDelta> {
let max_string_count = (index.facet_id_string_docids.len(rtxn)? / 500) as usize;
let max_number_count = (index.facet_id_f64_docids.len(rtxn)? / 500) as usize;
let max_string_count = max_string_count.clamp(1000, 100_000);
let max_number_count = max_number_count.clamp(1000, 100_000);
transpose_and_freeze_caches(&mut caches)?
.into_par_iter()
.map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
let mut facet_field_ids_delta =
FacetFieldIdsDelta::new(max_string_count, max_number_count);
let rtxn = index.read_txn()?;
merge_caches_sorted(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
@ -126,7 +133,10 @@ pub fn merge_and_send_facet_docids<'extractor>(
Ok(facet_field_ids_delta)
})
.reduce(|| Ok(FacetFieldIdsDelta::default()), |lhs, rhs| Ok(lhs?.merge(rhs?)))
.reduce(
|| Ok(FacetFieldIdsDelta::new(max_string_count, max_number_count)),
|lhs, rhs| Ok(lhs?.merge(rhs?)),
)
}
pub struct FacetDatabases<'a> {
@ -155,60 +165,131 @@ impl<'a> FacetDatabases<'a> {
}
}
#[derive(Debug, Default)]
#[derive(Debug)]
pub enum FacetFieldIdDelta {
Bulk,
Incremental(Vec<FacetFieldIdChange>),
}
impl FacetFieldIdDelta {
fn push(&mut self, facet_value: &[u8], max_count: usize) {
*self = match std::mem::replace(self, FacetFieldIdDelta::Bulk) {
FacetFieldIdDelta::Bulk => FacetFieldIdDelta::Bulk,
FacetFieldIdDelta::Incremental(mut v) => {
if v.len() >= max_count {
FacetFieldIdDelta::Bulk
} else {
v.push(FacetFieldIdChange { facet_value: facet_value.into() });
FacetFieldIdDelta::Incremental(v)
}
}
}
}
fn merge(&mut self, rhs: Option<Self>, max_count: usize) {
let Some(rhs) = rhs else {
return;
};
*self = match (std::mem::replace(self, FacetFieldIdDelta::Bulk), rhs) {
(FacetFieldIdDelta::Bulk, _) | (_, FacetFieldIdDelta::Bulk) => FacetFieldIdDelta::Bulk,
(
FacetFieldIdDelta::Incremental(mut left),
FacetFieldIdDelta::Incremental(mut right),
) => {
if left.len() + right.len() >= max_count {
FacetFieldIdDelta::Bulk
} else {
left.append(&mut right);
FacetFieldIdDelta::Incremental(left)
}
}
};
}
}
#[derive(Debug)]
pub struct FacetFieldIdsDelta {
/// The field ids that have been modified
modified_facet_string_ids: HashSet<FieldId>,
modified_facet_number_ids: HashSet<FieldId>,
modified_facet_string_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
modified_facet_number_ids: HashMap<FieldId, FacetFieldIdDelta, rustc_hash::FxBuildHasher>,
max_string_count: usize,
max_number_count: usize,
}
impl FacetFieldIdsDelta {
fn register_facet_string_id(&mut self, field_id: FieldId) {
self.modified_facet_string_ids.insert(field_id);
pub fn new(max_string_count: usize, max_number_count: usize) -> Self {
Self {
max_string_count,
max_number_count,
modified_facet_string_ids: Default::default(),
modified_facet_number_ids: Default::default(),
}
}
fn register_facet_number_id(&mut self, field_id: FieldId) {
self.modified_facet_number_ids.insert(field_id);
fn register_facet_string_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
self.modified_facet_string_ids
.entry(field_id)
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
.push(facet_value, self.max_string_count);
}
fn register_facet_number_id(&mut self, field_id: FieldId, facet_value: &[u8]) {
self.modified_facet_number_ids
.entry(field_id)
.or_insert(FacetFieldIdDelta::Incremental(Default::default()))
.push(facet_value, self.max_number_count);
}
fn register_from_key(&mut self, key: &[u8]) {
let (facet_kind, field_id) = self.extract_key_data(key);
match facet_kind {
FacetKind::Number => self.register_facet_number_id(field_id),
FacetKind::String => self.register_facet_string_id(field_id),
let (facet_kind, field_id, facet_value) = self.extract_key_data(key);
match (facet_kind, facet_value) {
(FacetKind::Number, Some(facet_value)) => {
self.register_facet_number_id(field_id, facet_value)
}
(FacetKind::String, Some(facet_value)) => {
self.register_facet_string_id(field_id, facet_value)
}
_ => (),
}
}
fn extract_key_data(&self, key: &[u8]) -> (FacetKind, FieldId) {
fn extract_key_data<'key>(&self, key: &'key [u8]) -> (FacetKind, FieldId, Option<&'key [u8]>) {
let facet_kind = FacetKind::from(key[0]);
let field_id = FieldId::from_be_bytes([key[1], key[2]]);
(facet_kind, field_id)
let facet_value = if key.len() >= 4 {
// level is also stored in the key at [3] (always 0)
Some(&key[4..])
} else {
None
};
(facet_kind, field_id, facet_value)
}
pub fn modified_facet_string_ids(&self) -> Option<Vec<FieldId>> {
if self.modified_facet_string_ids.is_empty() {
None
} else {
Some(self.modified_facet_string_ids.iter().copied().collect())
}
pub fn consume_facet_string_delta(
&mut self,
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
self.modified_facet_string_ids.drain()
}
pub fn modified_facet_number_ids(&self) -> Option<Vec<FieldId>> {
if self.modified_facet_number_ids.is_empty() {
None
} else {
Some(self.modified_facet_number_ids.iter().copied().collect())
}
pub fn consume_facet_number_delta(
&mut self,
) -> impl Iterator<Item = (FieldId, FacetFieldIdDelta)> + '_ {
self.modified_facet_number_ids.drain()
}
pub fn merge(mut self, rhs: Self) -> Self {
let Self { modified_facet_number_ids, modified_facet_string_ids } = rhs;
modified_facet_number_ids.into_iter().for_each(|fid| {
self.modified_facet_number_ids.insert(fid);
// rhs.max_xx_count is assumed to be equal to self.max_xx_count, and so gets unused
let Self { modified_facet_number_ids, modified_facet_string_ids, .. } = rhs;
modified_facet_number_ids.into_iter().for_each(|(fid, mut delta)| {
let old_delta = self.modified_facet_number_ids.remove(&fid);
delta.merge(old_delta, self.max_number_count);
self.modified_facet_number_ids.insert(fid, delta);
});
modified_facet_string_ids.into_iter().for_each(|fid| {
self.modified_facet_string_ids.insert(fid);
modified_facet_string_ids.into_iter().for_each(|(fid, mut delta)| {
let old_delta = self.modified_facet_string_ids.remove(&fid);
delta.merge(old_delta, self.max_string_count);
self.modified_facet_string_ids.insert(fid, delta);
});
self
}

View File

@ -16,6 +16,7 @@ pub mod indexer;
mod merger;
mod parallel_iterator_ext;
mod ref_cell_ext;
pub mod reindex;
pub(crate) mod steps;
pub(crate) mod thread_local;
pub mod vector_document;

View File

@ -0,0 +1,38 @@
use heed::RwTxn;
use super::document::{Document, DocumentFromDb};
use crate::progress::{self, AtomicSubStep, Progress};
use crate::{FieldDistribution, Index, Result};
pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> {
let mut distribution = FieldDistribution::new();
let document_count = index.number_of_documents(wtxn)?;
let field_id_map = index.fields_ids_map(wtxn)?;
let (update_document_count, sub_step) =
AtomicSubStep::<progress::Document>::new(document_count as u32);
progress.update_progress(sub_step);
let docids = index.documents_ids(wtxn)?;
for docid in docids {
update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else {
continue;
};
let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv)));
for res in document.iter_top_level_fields().chain(geo_iter) {
let (field_name, _) = res?;
if let Some(count) = distribution.get_mut(field_name) {
*count += 1;
} else {
distribution.insert(field_name.to_owned(), 1);
}
}
}
index.put_field_distribution(wtxn, &distribution)?;
Ok(())
}

View File

@ -105,7 +105,8 @@ impl<'t> VectorDocumentFromDb<'t> {
let vectors_field = match vectors {
Some(vectors) => Some(
RawMap::from_raw_value_and_hasher(vectors, FxBuildHasher, doc_alloc)
.map_err(InternalError::SerdeJson)?,
.map_err(InternalError::SerdeJson)
.unwrap(),
),
None => None,
};

View File

@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::Embedding;
use crate::ThreadPoolNoAbort;
@ -98,14 +98,18 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@ -113,20 +117,30 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {

View File

@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift;
use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource;
use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding;
@ -255,14 +255,18 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@ -270,20 +274,29 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {

View File

@ -188,14 +188,18 @@ impl Embedder {
text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
} else {
threads
.install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub(crate) fn embed_chunks_ref(
@ -203,20 +207,30 @@ impl Embedder {
texts: &[&str],
threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
}
pub fn chunk_count_hint(&self) -> usize {