Compare commits

...

46 Commits

Author SHA1 Message Date
Clément Renault
cf62af13e8 Merge pull request #6005 from meilisearch/clamp-max-batch-size
Clamp max batch size to 10 GiB
2025-11-20 10:45:23 +00:00
Many the fish
91cf94c196 Merge pull request #5999 from meilisearch/fix-document-fetch-sort
Fix the Document Fetch pagination bug when Sort is applied
2025-11-20 10:15:04 +00:00
Clément Renault
753ba39199 Update the documentation of the batch size 2025-11-20 10:33:02 +01:00
Clément Renault
3944c25853 Clamp the maximum batch size to maximum 10GiB 2025-11-20 10:29:50 +01:00
ManyTheFish
925bce5fbd Modify the test to test all the sort branches and fix the untested branch 2025-11-20 10:27:24 +01:00
ManyTheFish
62065ed30d Fix the pagination bug
where the last document of the previous page was duplicated as the first
document of the current page. This was due to a bug on the custom nth
function of the sort ranking rule skipping `n-1` documents instead of `n`
2025-11-20 10:27:24 +01:00
Clément Renault
97e6ae1957 Merge pull request #5994 from meilisearch/improve-s3-error-messages
Improve S3 upload by showing errors in the task queue
2025-11-19 16:58:02 +00:00
Clément Renault
5ed9be0789 Merge pull request #5990 from meilisearch/default-max-batch-size
Make the limit batched tasks total size defaults to half of the max indexing memory
2025-11-19 16:56:34 +00:00
Clément Renault
7597b1049f Merge pull request #6001 from meilisearch/update-windows-macos-ci
Update the macOS platform version in the CI
2025-11-19 16:12:52 +00:00
Clément Renault
d99150f21b Improve error message extraction
Co-authored-by: Many the fish <many@meilisearch.com>
2025-11-19 17:09:15 +01:00
Kerollmops
c9726674a0 Make the limit batched tasks total size default to half of max indexing
memory
2025-11-19 17:04:45 +01:00
Clément Renault
205f40b3b8 Update the macOS platform version to use version 14 2025-11-19 16:10:41 +01:00
Clément Renault
3d013cdebe Merge pull request #5995 from meilisearch/fix-embedding-skip
Fix embedding skip
2025-11-18 10:02:53 +00:00
Louis Dureuil
ddeff5678f Clippy happy 2025-11-17 14:48:40 +01:00
Louis Dureuil
a235434910 Add test 2025-11-17 13:52:23 +01:00
Louis Dureuil
a376525348 Do not skip embedding request for the document that exceeds capacity 2025-11-17 13:18:58 +01:00
Kerollmops
361580f451 Display the error message on failure 2025-11-17 09:21:18 +01:00
Clément Renault
ea70a7d1c9 Merge pull request #5969 from xuhongxu96/main
Remove unused dependency `allocator-api2`
2025-11-15 10:03:15 +00:00
Clément Renault
9304f8e586 Merge pull request #5991 from meilisearch/release-v1.26.0
Release v1.26.0
2025-11-13 17:54:01 +00:00
Louis Dureuil
495db080ec Upgrade snap 2025-11-13 17:52:34 +01:00
Louis Dureuil
d71341fa48 Suport upgrade to v1.26.0 2025-11-13 17:52:02 +01:00
Louis Dureuil
5b3070d8c3 Update version in toml and lock 2025-11-13 17:35:26 +01:00
Louis Dureuil
89006fd4b3 Merge pull request #5980 from hayatosc/feat/hugging-face-modernbert
Support ModernBERT architecture on `huggingface` embedder
2025-11-10 18:03:35 +00:00
Louis Dureuil
49f50a0a21 Don't collect the views 2025-11-10 17:55:44 +01:00
Louis Dureuil
1104f00803 happy clippy 2025-11-10 16:59:12 +01:00
Louis Dureuil
33fa564a9c rustfmt 2025-11-10 16:56:13 +01:00
Clément Renault
a097b254f8 Merge pull request #5963 from meilisearch/engprod-2128-allow-attaching-user-defined-metadata-to-tasks-and-return
Allow to attach `customMetadata` in the document addition or update tasks
2025-11-10 15:48:46 +00:00
Clément Renault
54cb0ec437 Merge pull request #5984 from meilisearch/embedder-error-modes
Embedder failure modes
2025-11-10 15:34:01 +00:00
Louis Dureuil
38ed1f1dbb Change parsing of environment variable 2025-11-10 15:08:24 +01:00
Clément Renault
643dd33358 Merge pull request #5982 from meilisearch/bump-meilisearch-v1.25.0
Bump meilisearch v1.25.0
2025-11-10 14:04:17 +00:00
Louis Dureuil
32f9fb6ab2 fix environment variable values 2025-11-10 14:54:25 +01:00
Louis Dureuil
b5966f82e8 Make max retry duration configurable with MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS 2025-11-10 14:29:27 +01:00
Louis Dureuil
5e54063aab Configurable timeout with MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS 2025-11-10 14:29:20 +01:00
Louis Dureuil
40456795d0 Allow to customize failure modes with MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES 2025-11-10 14:23:51 +01:00
ManyTheFish
40e60c6f52 Fix dumpless upgrade 2025-11-10 14:03:17 +01:00
ManyTheFish
eeae6383d0 Bump Meilisearch version v1.25.0 2025-11-10 14:03:17 +01:00
Hayato Sakaguchi
9f7172f6ab chage tensor names 2025-11-09 01:06:04 +09:00
Hayato Sakaguchi
d6eca83cfa Support modernbert architecture in hugging face embedder 2025-11-08 20:53:47 +09:00
Louis Dureuil
fc3508c8c8 Fix route path 2025-11-06 18:08:50 +01:00
Louis Dureuil
747476a225 Add customMetadata to all documents routes 2025-11-06 17:07:18 +01:00
Louis Dureuil
4b72e54ca7 Test webhook with metadata 2025-11-04 17:41:29 +01:00
Louis Dureuil
adef2cc132 test remote auto sharding with and without metadata 2025-11-04 17:41:28 +01:00
Louis Dureuil
533b9951b1 Allow adding custom metadata in tests 2025-11-04 17:41:28 +01:00
Louis Dureuil
9103cbc9db Add custom metadata to payload 2025-11-04 17:41:28 +01:00
Louis Dureuil
083de2bfc1 Allow to attach customMetadatain the document addition or update tasks 2025-11-04 17:41:28 +01:00
Hongxu Xu
08bc982748 Remove unused dependency allocator-api2 2025-11-04 03:29:24 +00:00
54 changed files with 1478 additions and 297 deletions

View File

@@ -65,9 +65,9 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-13, windows-2022]
os: [macos-14, windows-2022]
include:
- os: macos-13
- os: macos-14
artifact_name: meilisearch
asset_name: meilisearch-macos-amd64
- os: windows-2022
@@ -90,7 +90,7 @@ jobs:
publish-macos-apple-silicon:
name: Publish binary for macOS silicon
runs-on: macos-13
runs-on: macos-14
needs: check-version
strategy:
matrix:

View File

@@ -47,7 +47,7 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-13, windows-2022]
os: [macos-14, windows-2022]
steps:
- uses: actions/checkout@v5
- name: Cache dependencies

230
Cargo.lock generated
View File

@@ -310,6 +310,7 @@ dependencies = [
"const-random",
"getrandom 0.3.3",
"once_cell",
"serde",
"version_check",
"zerocopy",
]
@@ -344,12 +345,6 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
name = "allocator-api2"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c583acf993cf4245c4acb0a2cc2ab1f9cc097de73411bb6d3647ff6af2b1013d"
[[package]]
name = "anes"
version = "0.1.6"
@@ -492,7 +487,7 @@ dependencies = [
"backoff",
"base64 0.22.1",
"bytes",
"derive_builder 0.20.2",
"derive_builder",
"eventsource-stream",
"futures",
"rand 0.8.5",
@@ -589,7 +584,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"bumpalo",
@@ -799,7 +794,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"time",
@@ -812,7 +807,7 @@ version = "3.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
dependencies = [
"allocator-api2 0.2.21",
"allocator-api2",
"serde",
]
@@ -822,7 +817,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ce682bdc86c2e25ef5cd95881d9d6a1902214eddf74cf9ffea88fe1464377e8"
dependencies = [
"allocator-api2 0.2.21",
"allocator-api2",
"bitpacking",
"bumpalo",
"hashbrown 0.15.5",
@@ -945,7 +940,7 @@ dependencies = [
"rand 0.9.2",
"rand_distr",
"rayon",
"safetensors",
"safetensors 0.4.5",
"thiserror 1.0.69",
"ug",
"ug-cuda",
@@ -972,7 +967,7 @@ dependencies = [
"half",
"num-traits",
"rayon",
"safetensors",
"safetensors 0.4.5",
"serde",
"thiserror 1.0.69",
]
@@ -1052,6 +1047,15 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "castaway"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a"
dependencies = [
"rustversion",
]
[[package]]
name = "cc"
version = "1.2.37"
@@ -1214,7 +1218,7 @@ dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim 0.11.1",
"strsim",
]
[[package]]
@@ -1253,6 +1257,21 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "compact_str"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb1325a1cece981e8a296ab8f0f9b63ae357bd0784a9faaf548cc7b480707a"
dependencies = [
"castaway",
"cfg-if",
"itoa",
"rustversion",
"ryu",
"serde",
"static_assertions",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@@ -1511,38 +1530,14 @@ dependencies = [
"libloading",
]
[[package]]
name = "darling"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850"
dependencies = [
"darling_core 0.14.4",
"darling_macro 0.14.4",
]
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core 0.20.11",
"darling_macro 0.20.11",
]
[[package]]
name = "darling_core"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 1.0.109",
"darling_core",
"darling_macro",
]
[[package]]
@@ -1555,28 +1550,17 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
"strsim 0.11.1",
"strsim",
"syn 2.0.106",
]
[[package]]
name = "darling_macro"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e"
dependencies = [
"darling_core 0.14.4",
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"darling_core",
"quote",
"syn 2.0.106",
]
@@ -1586,6 +1570,9 @@ name = "dary_heap"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04d2cd9c18b9f454ed67da600630b021a8a80bf33f8c95896ab33aaf1c26b728"
dependencies = [
"serde",
]
[[package]]
name = "deadpool"
@@ -1641,34 +1628,13 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "derive_builder"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8"
dependencies = [
"derive_builder_macro 0.12.0",
]
[[package]]
name = "derive_builder"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "507dfb09ea8b7fa618fcf76e953f4f5e192547945816d5358edffe39f6f94947"
dependencies = [
"derive_builder_macro 0.20.2",
]
[[package]]
name = "derive_builder_core"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f"
dependencies = [
"darling 0.14.4",
"proc-macro2",
"quote",
"syn 1.0.109",
"derive_builder_macro",
]
[[package]]
@@ -1677,29 +1643,19 @@ version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8"
dependencies = [
"darling 0.20.11",
"darling",
"proc-macro2",
"quote",
"syn 2.0.106",
]
[[package]]
name = "derive_builder_macro"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e"
dependencies = [
"derive_builder_core 0.12.0",
"syn 1.0.109",
]
[[package]]
name = "derive_builder_macro"
version = "0.20.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab63b0e2bf4d5928aff72e83a7dace85d7bba5fe12dcc3c5a572d78caffd3f3c"
dependencies = [
"derive_builder_core 0.20.2",
"derive_builder_core",
"syn 2.0.106",
]
@@ -1738,7 +1694,7 @@ dependencies = [
"serde-cs",
"serde_json",
"serde_urlencoded",
"strsim 0.11.1",
"strsim",
]
[[package]]
@@ -1828,7 +1784,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"big_s",
@@ -2071,7 +2027,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-store"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"tempfile",
"thiserror 2.0.16",
@@ -2093,7 +2049,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"insta",
"levenshtein_automata",
@@ -2121,7 +2077,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"criterion",
"serde_json",
@@ -2278,7 +2234,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2804,7 +2760,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
dependencies = [
"ahash 0.8.12",
"allocator-api2 0.2.21",
"allocator-api2",
]
[[package]]
@@ -2813,7 +2769,7 @@ version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
dependencies = [
"allocator-api2 0.2.21",
"allocator-api2",
"equivalent",
"foldhash",
"serde",
@@ -3232,7 +3188,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"backoff",
@@ -3245,7 +3201,7 @@ dependencies = [
"convert_case 0.8.0",
"crossbeam-channel",
"csv",
"derive_builder 0.20.2",
"derive_builder",
"dump",
"enum-iterator",
"file-store",
@@ -3412,15 +3368,6 @@ dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569"
dependencies = [
"either",
]
[[package]]
name = "itertools"
version = "0.13.0"
@@ -3514,7 +3461,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"criterion",
"serde_json",
@@ -3765,7 +3712,7 @@ dependencies = [
"bincode 2.0.1",
"byteorder",
"csv",
"derive_builder 0.20.2",
"derive_builder",
"encoding",
"encoding_rs",
"encoding_rs_io",
@@ -4033,7 +3980,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"insta",
"md5",
@@ -4044,7 +3991,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"actix-cors",
"actix-http",
@@ -4141,7 +4088,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -4160,7 +4107,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"actix-web",
"anyhow",
@@ -4195,7 +4142,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"clap",
@@ -4229,9 +4176,8 @@ dependencies = [
[[package]]
name = "milli"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"allocator-api2 0.3.1",
"arroy",
"bbqueue",
"big_s",
@@ -4289,6 +4235,7 @@ dependencies = [
"roaring 0.10.12",
"rstar",
"rustc-hash 2.1.1",
"safetensors 0.6.2",
"serde",
"serde_json",
"slice-group-by",
@@ -4810,7 +4757,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "permissive-json-pointer"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"big_s",
"serde_json",
@@ -5399,12 +5346,12 @@ dependencies = [
[[package]]
name = "rayon-cond"
version = "0.3.0"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "059f538b55efd2309c9794130bc149c6a553db90e9d99c2030785c82f0bd7df9"
checksum = "2964d0cf57a3e7a06e8183d14a8b527195c706b7983549cd5462d5aa3747438f"
dependencies = [
"either",
"itertools 0.11.0",
"itertools 0.14.0",
"rayon",
]
@@ -5825,6 +5772,16 @@ dependencies = [
"serde_json",
]
[[package]]
name = "safetensors"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "172dd94c5a87b5c79f945c863da53b2ebc7ccef4eca24ac63cca66a41aab2178"
dependencies = [
"serde",
"serde_json",
]
[[package]]
name = "same-file"
version = "1.0.6"
@@ -6306,12 +6263,6 @@ dependencies = [
"indexmap",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -6637,21 +6588,24 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokenizers"
version = "0.15.2"
source = "git+https://github.com/huggingface/tokenizers.git?tag=v0.15.2#701a73b869602b5639589d197e805349cdba3223"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6475a27088c98ea96d00b39a9ddfb63780d1ad4cceb6f48374349a96ab2b7842"
dependencies = [
"ahash 0.8.12",
"aho-corasick",
"derive_builder 0.12.0",
"compact_str",
"dary_heap",
"derive_builder",
"esaxx-rs",
"getrandom 0.2.16",
"itertools 0.12.1",
"lazy_static",
"getrandom 0.3.3",
"itertools 0.14.0",
"log",
"macro_rules_attribute",
"monostate",
"onig",
"paste",
"rand 0.8.5",
"rand 0.9.2",
"rayon",
"rayon-cond",
"regex",
@@ -6659,7 +6613,7 @@ dependencies = [
"serde",
"serde_json",
"spm_precompiled",
"thiserror 1.0.69",
"thiserror 2.0.16",
"unicode-normalization-alignments",
"unicode-segmentation",
"unicode_categories",
@@ -7021,7 +6975,7 @@ dependencies = [
"num-traits",
"num_cpus",
"rayon",
"safetensors",
"safetensors 0.4.5",
"serde",
"thiserror 1.0.69",
"tracing",
@@ -7251,7 +7205,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b2bf58be11fc9414104c6d3a2e464163db5ef74b12296bda593cac37b6e4777"
dependencies = [
"anyhow",
"derive_builder 0.20.2",
"derive_builder",
"rustversion",
"vergen-lib",
]
@@ -7263,7 +7217,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f6ee511ec45098eabade8a0750e76eec671e7fb2d9360c563911336bea9cac1"
dependencies = [
"anyhow",
"derive_builder 0.20.2",
"derive_builder",
"git2",
"rustversion",
"time",
@@ -7278,7 +7232,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b07e6010c0f3e59fcb164e0163834597da68d1f864e2b8ca49f74de01e9c166"
dependencies = [
"anyhow",
"derive_builder 0.20.2",
"derive_builder",
"rustversion",
]
@@ -7925,7 +7879,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.24.0"
version = "1.26.0"
dependencies = [
"anyhow",
"build-info",

View File

@@ -23,7 +23,7 @@ members = [
]
[workspace.package]
version = "1.24.0"
version = "1.26.0"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -96,6 +96,8 @@ pub struct TaskDump {
pub finished_at: Option<OffsetDateTime>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom_metadata: Option<String>,
}
// A `Kind` specific version made for the dump. If modified you may break the dump.
@@ -178,6 +180,7 @@ impl From<Task> for TaskDump {
started_at: task.started_at,
finished_at: task.finished_at,
network: task.network,
custom_metadata: task.custom_metadata,
}
}
}
@@ -396,6 +399,7 @@ pub(crate) mod test {
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
network: None,
custom_metadata: None,
},
None,
),
@@ -421,6 +425,7 @@ pub(crate) mod test {
started_at: None,
finished_at: None,
network: None,
custom_metadata: None,
},
Some(vec![
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
@@ -441,6 +446,7 @@ pub(crate) mod test {
started_at: None,
finished_at: None,
network: None,
custom_metadata: None,
},
None,
),

View File

@@ -164,6 +164,7 @@ impl CompatV5ToV6 {
started_at: task_view.started_at,
finished_at: task_view.finished_at,
network: None,
custom_metadata: None,
};
(task, content_file)

View File

@@ -150,6 +150,7 @@ impl<'a> Dump<'a> {
details: task.details,
status: task.status,
network: task.network,
custom_metadata: task.custom_metadata,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,

View File

@@ -232,6 +232,7 @@ pub fn snapshot_task(task: &Task) -> String {
status,
kind,
network,
custom_metadata,
} = task;
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
@@ -252,6 +253,9 @@ pub fn snapshot_task(task: &Task) -> String {
if let Some(network) = network {
snap.push_str(&format!("network: {network:?}, "))
}
if let Some(custom_metadata) = custom_metadata {
snap.push_str(&format!("custom_metadata: {custom_metadata:?}"))
}
snap.push('}');
snap

View File

@@ -756,6 +756,19 @@ impl IndexScheduler {
kind: KindWithContent,
task_id: Option<TaskId>,
dry_run: bool,
) -> Result<Task> {
self.register_with_custom_metadata(kind, task_id, None, dry_run)
}
/// Register a new task in the scheduler, with metadata.
///
/// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register_with_custom_metadata(
&self,
kind: KindWithContent,
task_id: Option<TaskId>,
custom_metadata: Option<String>,
dry_run: bool,
) -> Result<Task> {
// if the task doesn't delete or cancel anything and 40% of the task queue is full, we must refuse to enqueue the incoming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } | KindWithContent::TaskCancelation { tasks, .. } if !tasks.is_empty())
@@ -766,7 +779,7 @@ impl IndexScheduler {
}
let mut wtxn = self.env.write_txn()?;
let task = self.queue.register(&mut wtxn, &kind, task_id, dry_run)?;
let task = self.queue.register(&mut wtxn, &kind, task_id, custom_metadata, dry_run)?;
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).

View File

@@ -257,6 +257,7 @@ impl Queue {
wtxn: &mut RwTxn,
kind: &KindWithContent,
task_id: Option<TaskId>,
custom_metadata: Option<String>,
dry_run: bool,
) -> Result<Task> {
let next_task_id = self.tasks.next_task_id(wtxn)?;
@@ -280,6 +281,7 @@ impl Queue {
status: Status::Enqueued,
kind: kind.clone(),
network: None,
custom_metadata,
};
// For deletion and cancelation tasks, we want to make extra sure that they
// don't attempt to delete/cancel tasks that are newer than themselves.
@@ -344,6 +346,7 @@ impl Queue {
tasks: to_delete,
},
None,
None,
false,
)?;

View File

@@ -438,12 +438,15 @@ async fn multipart_stream_to_s3(
db_name: String,
reader: std::io::PipeReader,
) -> Result<(), Error> {
use std::{collections::VecDeque, os::fd::OwnedFd, path::PathBuf};
use std::collections::VecDeque;
use std::io;
use std::os::fd::OwnedFd;
use std::path::PathBuf;
use bytes::{Bytes, BytesMut};
use reqwest::{Client, Response};
use rusty_s3::S3Action as _;
use rusty_s3::{actions::CreateMultipartUpload, Bucket, BucketError, Credentials, UrlStyle};
use rusty_s3::actions::CreateMultipartUpload;
use rusty_s3::{Bucket, BucketError, Credentials, S3Action as _, UrlStyle};
use tokio::task::JoinHandle;
let reader = OwnedFd::from(reader);
@@ -517,7 +520,6 @@ async fn multipart_stream_to_s3(
while buffer.len() < (s3_multipart_part_size as usize / 2) {
// Wait for the pipe to be readable
use std::io;
reader.readable().await?;
match reader.try_read_buf(&mut buffer) {
@@ -581,15 +583,17 @@ async fn multipart_stream_to_s3(
async move {
match client.post(url).body(body).send().await {
Ok(resp) if resp.status().is_client_error() => {
resp.error_for_status().map_err(backoff::Error::Permanent)
Err(backoff::Error::Permanent(Error::S3Error {
status: resp.status(),
body: resp.text().await.unwrap_or_default(),
}))
}
Ok(resp) => Ok(resp),
Err(e) => Err(backoff::Error::transient(e)),
Err(e) => Err(backoff::Error::transient(Error::S3HttpError(e))),
}
}
})
.await
.map_err(Error::S3HttpError)?;
.await?;
let status = resp.status();
let body = resp.text().await.map_err(|e| Error::S3Error { status, body: e.to_string() })?;

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
----------------------------------------------------------------------
### Status:
enqueued [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
@@ -37,7 +37,7 @@ catto [1,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@@ -40,7 +40,7 @@ doggo [2,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 24, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 26, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone"), old_new_uid: None, new_index_uid: None }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -43,7 +43,7 @@ doggo [2,3,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.24.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.26.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -48,6 +48,8 @@ pub fn upgrade_index_scheduler(
(1, 22, _) => 0,
(1, 23, _) => 0,
(1, 24, _) => 0,
(1, 25, _) => 0,
(1, 26, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)
@@ -98,6 +100,7 @@ pub fn upgrade_index_scheduler(
status: Status::Enqueued,
kind: KindWithContent::UpgradeDatabase { from },
network: None,
custom_metadata: None,
},
)?;
wtxn.commit()?;

View File

@@ -379,6 +379,7 @@ impl crate::IndexScheduler {
status,
kind,
network: _,
custom_metadata: _,
} = task;
assert_eq!(uid, task.uid);
if task.status != Status::Enqueued {

View File

@@ -254,6 +254,7 @@ InvalidSearchHybridQuery , InvalidRequest , BAD_REQU
InvalidIndexLimit , InvalidRequest , BAD_REQUEST ;
InvalidIndexOffset , InvalidRequest , BAD_REQUEST ;
InvalidIndexPrimaryKey , InvalidRequest , BAD_REQUEST ;
InvalidIndexCustomMetadata , InvalidRequest , BAD_REQUEST ;
InvalidIndexUid , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchFacets , InvalidRequest , BAD_REQUEST ;
InvalidMultiSearchFacetsByIndex , InvalidRequest , BAD_REQUEST ;

View File

@@ -55,6 +55,9 @@ pub struct TaskView {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom_metadata: Option<String>,
}
impl TaskView {
@@ -73,6 +76,7 @@ impl TaskView {
started_at: task.started_at,
finished_at: task.finished_at,
network: task.network.clone(),
custom_metadata: task.custom_metadata.clone(),
}
}
}

View File

@@ -45,6 +45,9 @@ pub struct Task {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub custom_metadata: Option<String>,
}
impl Task {

View File

@@ -195,7 +195,7 @@ struct Infos {
experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
experimental_limit_batched_tasks_total_size: u64,
experimental_limit_batched_tasks_total_size: Option<u64>,
experimental_network: bool,
experimental_multimodal: bool,
experimental_chat_completions: bool,
@@ -359,7 +359,7 @@ impl Infos {
http_payload_size_limit,
experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size:
experimental_limit_batched_tasks_total_size.into(),
experimental_limit_batched_tasks_total_size.map(|size| size.as_u64()),
task_queue_webhook: task_webhook_url.is_some(),
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
log_level: log_level.to_string(),

View File

@@ -230,7 +230,17 @@ pub fn setup_meilisearch(
cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.into(),
batched_tasks_size_limit: opt.experimental_limit_batched_tasks_total_size.map_or_else(
|| {
opt.indexer_options
.max_indexing_memory
// By default, we use half of the available memory to determine the size of batched tasks
.map_or(u64::MAX, |mem| mem.as_u64() / 2)
// And never exceed 10 GiB when we infer the limit
.min(10 * 1024 * 1024 * 1024)
},
|size| size.as_u64(),
),
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().as_u64() as usize,
index_count: DEFAULT_INDEX_COUNT,
instance_features: opt.to_instance_features(),

View File

@@ -473,11 +473,14 @@ pub struct Opt {
#[serde(default = "default_limit_batched_tasks")]
pub experimental_max_number_of_batched_tasks: usize,
/// Experimentally reduces the maximum total size, in bytes, of tasks that will be processed at once,
/// see: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, default_value_t = default_limit_batched_tasks_total_size())]
#[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: Byte,
/// Experimentally controls the maximum total size, in bytes, of tasks that will be processed
/// simultaneously. When unspecified, defaults to half of the maximum indexing memory and
/// clamped to 10 GiB.
///
/// See: <https://github.com/orgs/meilisearch/discussions/801>
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE)]
#[serde(default)]
pub experimental_limit_batched_tasks_total_size: Option<Byte>,
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
/// distinct embedder.
@@ -701,10 +704,12 @@ impl Opt {
MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS,
experimental_max_number_of_batched_tasks.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
experimental_limit_batched_tasks_total_size.to_string(),
);
if let Some(limit) = experimental_limit_batched_tasks_total_size {
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
limit.to_string(),
);
}
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
experimental_embedding_cache_entries.to_string(),
@@ -1273,10 +1278,6 @@ fn default_limit_batched_tasks() -> usize {
usize::MAX
}
fn default_limit_batched_tasks_total_size() -> Byte {
Byte::from_u64(u64::MAX)
}
fn default_embedding_cache_entries() -> usize {
0
}

View File

@@ -1,14 +1,14 @@
use crate::search::{Personalize, SearchResult};
use meilisearch_types::{
error::{Code, ErrorCode, ResponseError},
milli::TimeBudget,
};
use std::time::Duration;
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
use meilisearch_types::milli::TimeBudget;
use rand::Rng;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, info, warn};
use crate::search::{Personalize, SearchResult};
const COHERE_API_URL: &str = "https://api.cohere.ai/v1/rerank";
const MAX_RETRIES: u32 = 10;

View File

@@ -333,10 +333,12 @@ impl Aggregate for DocumentsDeletionAggregator {
pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>,
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let CustomMetadataQuery { custom_metadata } = params.into_inner();
let DocumentParam { index_uid, document_id } = path.into_inner();
let index_uid = IndexUid::try_from(index_uid)?;
let network = index_scheduler.network();
@@ -359,7 +361,10 @@ pub async fn delete_document(
let dry_run = is_dry_run(&req, &opt)?;
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
tokio::task::spawn_blocking(move || {
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
})
.await??
};
if network.sharding && !dry_run {
@@ -678,6 +683,19 @@ pub struct UpdateDocumentsQuery {
#[param(value_type = char, default = ",", example = ";")]
#[deserr(default, try_from(char) = from_char_csv_delimiter -> DeserrQueryParamError<InvalidDocumentCsvDelimiter>, error = DeserrQueryParamError<InvalidDocumentCsvDelimiter>)]
pub csv_delimiter: Option<u8>,
#[param(example = "custom")]
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
pub custom_metadata: Option<String>,
}
#[derive(Deserialize, Debug, Deserr, IntoParams)]
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
#[into_params(parameter_in = Query, rename_all = "camelCase")]
pub struct CustomMetadataQuery {
#[param(example = "custom")]
#[deserr(default, error = DeserrQueryParamError<InvalidIndexCustomMetadata>)]
pub custom_metadata: Option<String>,
}
fn from_char_csv_delimiter(
@@ -819,6 +837,7 @@ pub async fn replace_documents(
body,
IndexDocumentsMethod::ReplaceDocuments,
uid,
params.custom_metadata,
dry_run,
allow_index_creation,
&req,
@@ -921,6 +940,7 @@ pub async fn update_documents(
body,
IndexDocumentsMethod::UpdateDocuments,
uid,
params.custom_metadata,
dry_run,
allow_index_creation,
&req,
@@ -940,6 +960,7 @@ async fn document_addition(
body: Payload,
method: IndexDocumentsMethod,
task_id: Option<TaskId>,
custom_metadata: Option<String>,
dry_run: bool,
allow_index_creation: bool,
req: &HttpRequest,
@@ -1065,8 +1086,10 @@ async fn document_addition(
};
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
.await?
let task = match tokio::task::spawn_blocking(move || {
scheduler.register_with_custom_metadata(task, task_id, custom_metadata, dry_run)
})
.await?
{
Ok(task) => task,
Err(e) => {
@@ -1130,7 +1153,7 @@ async fn copy_body_to_file(
/// Delete a set of documents based on an array of document ids.
#[utoipa::path(
post,
path = "{indexUid}/delete-batch",
path = "{indexUid}/documents/delete-batch",
tag = "Documents",
security(("Bearer" = ["documents.delete", "documents.*", "*"])),
params(
@@ -1161,11 +1184,14 @@ pub async fn delete_documents_batch(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
body: web::Json<Vec<Value>>,
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch");
let CustomMetadataQuery { custom_metadata } = params.into_inner();
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
@@ -1190,7 +1216,10 @@ pub async fn delete_documents_batch(
let dry_run = is_dry_run(&req, &opt)?;
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
tokio::task::spawn_blocking(move || {
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
})
.await??
};
if network.sharding && !dry_run {
@@ -1244,12 +1273,15 @@ pub struct DocumentDeletionByFilter {
pub async fn delete_documents_by_filter(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by filter");
let CustomMetadataQuery { custom_metadata } = params.into_inner();
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let filter = body.into_inner();
@@ -1282,7 +1314,10 @@ pub async fn delete_documents_by_filter(
let dry_run = is_dry_run(&req, &opt)?;
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
tokio::task::spawn_blocking(move || {
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
})
.await??
};
if network.sharding && !dry_run {
@@ -1372,12 +1407,14 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
pub async fn edit_documents_by_function(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?params, "Edit documents by function");
debug!(parameters = ?body, "Edit documents by function");
let CustomMetadataQuery { custom_metadata } = params.into_inner();
index_scheduler
.features()
@@ -1387,23 +1424,23 @@ pub async fn edit_documents_by_function(
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let params = params.into_inner();
let body = body.into_inner();
analytics.publish(
EditDocumentsByFunctionAggregator {
filtered: params.filter.is_some(),
with_context: params.context.is_some(),
filtered: body.filter.is_some(),
with_context: body.context.is_some(),
index_creation: index_scheduler.index(&index_uid).is_err(),
},
&req,
);
let engine = milli::rhai::Engine::new();
if let Err(e) = engine.compile(&params.function) {
if let Err(e) = engine.compile(&body.function) {
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
}
if let Some(ref filter) = params.filter {
if let Some(ref filter) = body.filter {
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(
filter,
@@ -1414,8 +1451,8 @@ pub async fn edit_documents_by_function(
}
let task = KindWithContent::DocumentEdition {
index_uid: index_uid.clone(),
filter_expr: params.filter.clone(),
context: match params.context.clone() {
filter_expr: body.filter.clone(),
context: match body.context.clone() {
Some(Value::Object(m)) => Some(m),
None => None,
_ => {
@@ -1425,18 +1462,21 @@ pub async fn edit_documents_by_function(
))
}
},
function: params.function.clone(),
function: body.function.clone(),
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
tokio::task::spawn_blocking(move || {
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
})
.await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
}
let task: SummarizedTaskView = task.into();
@@ -1477,12 +1517,14 @@ pub async fn edit_documents_by_function(
pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: AwebQueryParameter<CustomMetadataQuery, DeserrQueryParamError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
let CustomMetadataQuery { custom_metadata } = params.into_inner();
analytics.publish(
DocumentsDeletionAggregator {
@@ -1501,7 +1543,10 @@ pub async fn clear_all_documents(
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
tokio::task::spawn_blocking(move || {
index_scheduler.register_with_custom_metadata(task, uid, custom_metadata, dry_run)
})
.await??
};
if network.sharding && !dry_run {

View File

@@ -218,6 +218,8 @@ pub struct SummarizedTaskView {
deserialize_with = "time::serde::rfc3339::deserialize"
)]
enqueued_at: OffsetDateTime,
#[serde(default, skip_serializing_if = "Option::is_none")]
custom_metadata: Option<String>,
}
impl From<Task> for SummarizedTaskView {
@@ -228,6 +230,7 @@ impl From<Task> for SummarizedTaskView {
status: task.status,
kind: task.kind.as_kind(),
enqueued_at: task.enqueued_at,
custom_metadata: task.custom_metadata,
}
}
}

View File

@@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use uuid::Uuid;
use crate::search::SearchMetadata;
use super::super::{ComputedFacets, FacetStats, HitsInfo, SearchHit, SearchQueryWithIndex};
use crate::milli::vector::Embedding;
use crate::search::SearchMetadata;
pub const DEFAULT_FEDERATED_WEIGHT: f64 = 1.0;

View File

@@ -91,7 +91,16 @@ impl<'a> Index<'a, Owned> {
documents: Value,
primary_key: Option<&str>,
) -> (Value, StatusCode) {
self._add_documents(documents, primary_key).await
self._add_documents(documents, primary_key, None).await
}
pub async fn add_documents_with_custom_metadata(
&self,
documents: Value,
primary_key: Option<&str>,
custom_metadata: Option<&str>,
) -> (Value, StatusCode) {
self._add_documents(documents, primary_key, custom_metadata).await
}
pub async fn raw_add_documents(
@@ -352,12 +361,25 @@ impl<State> Index<'_, State> {
&self,
documents: Value,
primary_key: Option<&str>,
custom_metadata: Option<&str>,
) -> (Value, StatusCode) {
let url = match primary_key {
Some(key) => {
format!("/indexes/{}/documents?primaryKey={}", urlencode(self.uid.as_ref()), key)
let url = match (primary_key, custom_metadata) {
(Some(key), Some(meta)) => {
format!(
"/indexes/{}/documents?primaryKey={key}&customMetadata={meta}",
urlencode(self.uid.as_ref()),
)
}
None => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
(None, Some(meta)) => {
format!(
"/indexes/{}/documents?&customMetadata={meta}",
urlencode(self.uid.as_ref()),
)
}
(Some(key), None) => {
format!("/indexes/{}/documents?&primaryKey={key}", urlencode(self.uid.as_ref()),)
}
(None, None) => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
};
self.service.post_encoded(url, documents, self.encoder).await
}

View File

@@ -241,7 +241,7 @@ pub async fn shared_index_with_documents() -> &'static Index<'static, Shared> {
let server = Server::new_shared();
let index = server._index("SHARED_DOCUMENTS").to_shared();
let documents = DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
let (response, _code) = index._add_documents(documents, None, None).await;
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
@@ -284,7 +284,7 @@ pub async fn shared_index_with_score_documents() -> &'static Index<'static, Shar
let server = Server::new_shared();
let index = server._index("SHARED_SCORE_DOCUMENTS").to_shared();
let documents = SCORE_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
let (response, _code) = index._add_documents(documents, None, None).await;
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
@@ -361,7 +361,7 @@ pub async fn shared_index_with_nested_documents() -> &'static Index<'static, Sha
let server = Server::new_shared();
let index = server._index("SHARED_NESTED_DOCUMENTS").to_shared();
let documents = NESTED_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
let (response, _code) = index._add_documents(documents, None, None).await;
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
@@ -508,7 +508,7 @@ pub async fn shared_index_with_geo_documents() -> &'static Index<'static, Shared
.get_or_init(|| async {
let server = Server::new_shared();
let index = server._index("SHARED_GEO_DOCUMENTS").to_shared();
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None).await;
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None, None).await;
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
@@ -531,7 +531,7 @@ pub async fn shared_index_geojson_documents() -> &'static Index<'static, Shared>
let index = server._index("SHARED_GEOJSON_DOCUMENTS").to_shared();
let countries = include_str!("../documents/geojson/assets/countries.json");
let lille = serde_json::from_str::<serde_json::Value>(countries).unwrap();
let (response, _code) = index._add_documents(Value(lille), Some("name")).await;
let (response, _code) = index._add_documents(Value(lille), Some("name"), None).await;
server.wait_task(response.uid()).await.succeeded();
let (response, _code) =

View File

@@ -1339,3 +1339,117 @@ async fn get_document_with_vectors() {
}
"###);
}
#[actix_rt::test]
async fn test_fetch_documents_pagination_with_sorting() {
let server = Server::new_shared();
let index = server.unique_index();
let (task, _code) = index.create(None).await;
server.wait_task(task.uid()).await.succeeded();
// Set name as sortable attribute
let (task, code) = index.update_settings_sortable_attributes(json!(["name"])).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.succeeded();
let documents = json!((0..50)
.map(|i| json!({"id": i, "name": format!("doc_{:05}", std::cmp::min(i, 5))}))
.collect::<Vec<_>>());
// Add documents as described in the bug report
let (task, code) = index.add_documents(documents, None).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.succeeded();
// Request 1 (first page): offset 0, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 0,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 0,
"name": "doc_00000"
},
{
"id": 1,
"name": "doc_00001"
}
]
"###);
// Request 2 (second page): offset 2, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 2,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 2,
"name": "doc_00002"
},
{
"id": 3,
"name": "doc_00003"
}
]
"###);
// Request 3 (third page): offset 4, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 4,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 4,
"name": "doc_00004"
},
{
"id": 5,
"name": "doc_00005"
}
]
"###);
// Request 4 (fourth page): offset 6, limit 2
let (response, code) = index
.fetch_documents(json!({
"offset": 6,
"limit": 2,
"sort": ["name:asc"]
}))
.await;
assert_eq!(code, 200);
let results = response["results"].as_array().unwrap();
snapshot!(json_string!(results), @r###"
[
{
"id": 6,
"name": "doc_00005"
},
{
"id": 7,
"name": "doc_00005"
}
]
"###);
}

View File

@@ -137,6 +137,60 @@ static SIMPLE_SEARCH_DOCUMENTS: Lazy<Value> = Lazy::new(|| {
}])
});
static MANY_DOCS: Lazy<Value> = Lazy::new(|| {
json!([
{
"title": "Shazam!",
"desc": "a Captain Marvel ersatz",
"id": "1",
},
{
"title": "Captain Planet",
"desc": "He's not part of the Marvel Cinematic Universe",
"id": "2",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "3",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "4",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "5",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "6",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "7",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "8",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "9",
},
{
"title": "Captain Marvel",
"desc": "a Shazam ersatz",
"id": "10",
}])
});
#[actix_rt::test]
async fn simple_search() {
let server = Server::new_shared();
@@ -449,6 +503,38 @@ async fn simple_search_hf() {
snapshot!(response["semanticHitCount"], @"3");
}
#[actix_rt::test]
async fn issue_5976_missing_docs_hf() {
let server = Server::new_shared();
let index = index_with_documents_hf(server, &MANY_DOCS).await;
let (response, code) = index
.search_post(
json!({"q": "Wonder replacement", "hybrid": {"embedder": "default", "semanticRatio": 1.0}, "retrieveVectors": true}),
)
.await;
snapshot!(code, @"200 OK");
let are_empty: Vec<_> = response["hits"]
.as_array()
.unwrap()
.iter()
.map(|hit| hit["_vectors"]["default"]["embeddings"].as_array().unwrap().is_empty())
.collect();
snapshot!(json!(are_empty), @r###"
[
false,
false,
false,
false,
false,
false,
false,
false,
false,
false
]
"###);
}
#[actix_rt::test]
async fn distribution_shift() {
let server = Server::new_shared();

View File

@@ -3141,3 +3141,513 @@ fn fail(override_response_body: Option<&str>) -> ResponseTemplate {
response.set_body_json(json!({"error": "provoked error", "code": "test_error", "link": "https://docs.meilisearch.com/errors#test_error"}))
}
}
#[actix_rt::test]
async fn remote_auto_sharding() {
let ms0 = Server::new().await;
let ms1 = Server::new().await;
let ms2 = Server::new().await;
// enable feature
let (response, code) = ms0.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
let (response, code) = ms1.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
let (response, code) = ms2.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
// set self & sharding
let (response, code) = ms0.set_network(json!({"self": "ms0", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {},
"sharding": true
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {},
"sharding": true
}
"###);
let (response, code) = ms2.set_network(json!({"self": "ms2", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms2",
"remotes": {},
"sharding": true
}
"###);
// wrap servers
let ms0 = Arc::new(ms0);
let ms1 = Arc::new(ms1);
let ms2 = Arc::new(ms2);
let rms0 = LocalMeili::new(ms0.clone()).await;
let rms1 = LocalMeili::new(ms1.clone()).await;
let rms2 = LocalMeili::new(ms2.clone()).await;
// set network
let network = json!({"remotes": {
"ms0": {
"url": rms0.url()
},
"ms1": {
"url": rms1.url()
},
"ms2": {
"url": rms2.url()
}
}});
println!("{}", serde_json::to_string_pretty(&network).unwrap());
let (_response, status_code) = ms0.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
let (_response, status_code) = ms1.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
let (_response, status_code) = ms2.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
// add documents
let documents = SCORE_DOCUMENTS.clone();
let documents = documents.as_array().unwrap();
let index0 = ms0.index("test");
let _index1 = ms1.index("test");
let _index2 = ms2.index("test");
let (task, _status_code) = index0.add_documents(json!(documents), None).await;
let t0 = task.uid();
let (t, _) = ms0.get_task(task.uid()).await;
let t1 = t["network"]["remote_tasks"]["ms1"]["taskUid"].as_u64().unwrap();
let t2 = t["network"]["remote_tasks"]["ms2"]["taskUid"].as_u64().unwrap();
ms0.wait_task(t0).await.succeeded();
ms1.wait_task(t1).await.succeeded();
ms2.wait_task(t2).await.succeeded();
// perform multi-search
let query = "badman returns";
let request = json!({
"federation": {},
"queries": [
{
"q": query,
"indexUid": "test",
"federationOptions": {
"remote": "ms0"
}
},
{
"q": query,
"indexUid": "test",
"federationOptions": {
"remote": "ms1"
}
},
{
"q": query,
"indexUid": "test",
"federationOptions": {
"remote": "ms2"
}
},
]
});
let (response, _status_code) = ms0.multi_search(request.clone()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Batman Returns",
"id": "C",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.8317901234567902,
"remote": "ms2"
}
},
{
"title": "Batman the dark knight returns: Part 1",
"id": "A",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Batman the dark knight returns: Part 2",
"id": "B",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Badman",
"id": "E",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.5,
"remote": "ms2"
}
},
{
"title": "Batman",
"id": "D",
"_federation": {
"indexUid": "test",
"queriesPosition": 0,
"weightedRankingScore": 0.23106060606060605,
"remote": "ms0"
}
}
],
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 5,
"requestUid": "[uuid]",
"remoteErrors": {}
}
"###);
let (response, _status_code) = ms1.multi_search(request.clone()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Batman Returns",
"id": "C",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.8317901234567902,
"remote": "ms2"
}
},
{
"title": "Batman the dark knight returns: Part 1",
"id": "A",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Batman the dark knight returns: Part 2",
"id": "B",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Badman",
"id": "E",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.5,
"remote": "ms2"
}
},
{
"title": "Batman",
"id": "D",
"_federation": {
"indexUid": "test",
"queriesPosition": 0,
"weightedRankingScore": 0.23106060606060605,
"remote": "ms0"
}
}
],
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 5,
"requestUid": "[uuid]",
"remoteErrors": {}
}
"###);
let (response, _status_code) = ms2.multi_search(request.clone()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]", ".requestUid" => "[uuid]" }), @r###"
{
"hits": [
{
"title": "Batman Returns",
"id": "C",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.8317901234567902,
"remote": "ms2"
}
},
{
"title": "Batman the dark knight returns: Part 1",
"id": "A",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Batman the dark knight returns: Part 2",
"id": "B",
"_federation": {
"indexUid": "test",
"queriesPosition": 1,
"weightedRankingScore": 0.7028218694885362,
"remote": "ms1"
}
},
{
"title": "Badman",
"id": "E",
"_federation": {
"indexUid": "test",
"queriesPosition": 2,
"weightedRankingScore": 0.5,
"remote": "ms2"
}
},
{
"title": "Batman",
"id": "D",
"_federation": {
"indexUid": "test",
"queriesPosition": 0,
"weightedRankingScore": 0.23106060606060605,
"remote": "ms0"
}
}
],
"processingTimeMs": "[time]",
"limit": 20,
"offset": 0,
"estimatedTotalHits": 5,
"requestUid": "[uuid]",
"remoteErrors": {}
}
"###);
}
#[actix_rt::test]
async fn remote_auto_sharding_with_custom_metadata() {
let ms0 = Server::new().await;
let ms1 = Server::new().await;
let ms2 = Server::new().await;
// enable feature
let (response, code) = ms0.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
let (response, code) = ms1.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
let (response, code) = ms2.set_features(json!({"network": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response["network"]), @"true");
// set self & sharding
let (response, code) = ms0.set_network(json!({"self": "ms0", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {},
"sharding": true
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {},
"sharding": true
}
"###);
let (response, code) = ms2.set_network(json!({"self": "ms2", "sharding": true})).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(response), @r###"
{
"self": "ms2",
"remotes": {},
"sharding": true
}
"###);
// wrap servers
let ms0 = Arc::new(ms0);
let ms1 = Arc::new(ms1);
let ms2 = Arc::new(ms2);
let rms0 = LocalMeili::new(ms0.clone()).await;
let rms1 = LocalMeili::new(ms1.clone()).await;
let rms2 = LocalMeili::new(ms2.clone()).await;
// set network
let network = json!({"remotes": {
"ms0": {
"url": rms0.url()
},
"ms1": {
"url": rms1.url()
},
"ms2": {
"url": rms2.url()
}
}});
println!("{}", serde_json::to_string_pretty(&network).unwrap());
let (_response, status_code) = ms0.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
let (_response, status_code) = ms1.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
let (_response, status_code) = ms2.set_network(network.clone()).await;
snapshot!(status_code, @"200 OK");
// add documents
let documents = SCORE_DOCUMENTS.clone();
let documents = documents.as_array().unwrap();
let index0 = ms0.index("test");
let _index1 = ms1.index("test");
let _index2 = ms2.index("test");
let (task, _status_code) = index0
.add_documents_with_custom_metadata(
json!(documents),
None,
Some("remote_auto_sharding_with_custom_metadata"),
)
.await;
let t0 = task.uid();
let (t, _) = ms0.get_task(task.uid()).await;
let t1 = t["network"]["remote_tasks"]["ms1"]["taskUid"].as_u64().unwrap();
let t2 = t["network"]["remote_tasks"]["ms2"]["taskUid"].as_u64().unwrap();
let t = ms0.wait_task(t0).await.succeeded();
snapshot!(t, @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "test",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 5,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"network": {
"remote_tasks": {
"ms1": {
"taskUid": 0,
"error": null
},
"ms2": {
"taskUid": 0,
"error": null
}
}
},
"customMetadata": "remote_auto_sharding_with_custom_metadata"
}
"###);
let t = ms1.wait_task(t1).await.succeeded();
snapshot!(t, @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "test",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 5,
"indexedDocuments": 2
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"network": {
"origin": {
"remoteName": "ms0",
"taskUid": 0
}
},
"customMetadata": "remote_auto_sharding_with_custom_metadata"
}
"###);
let t = ms2.wait_task(t2).await.succeeded();
snapshot!(t, @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "test",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 5,
"indexedDocuments": 2
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"network": {
"origin": {
"remoteName": "ms0",
"taskUid": 0
}
},
"customMetadata": "remote_auto_sharding_with_custom_metadata"
}
"###);
}

View File

@@ -656,3 +656,119 @@ async fn forbidden_fields() {
}
"#);
}
#[actix_web::test]
async fn receive_custom_metadata() {
let WebhookHandle { server_handle: handle1, url: url1, receiver: mut receiver1 } =
create_webhook_server().await;
let WebhookHandle { server_handle: handle2, url: url2, receiver: mut receiver2 } =
create_webhook_server().await;
let WebhookHandle { server_handle: handle3, url: url3, receiver: mut receiver3 } =
create_webhook_server().await;
let db_path = tempfile::tempdir().unwrap();
let server = Server::new_with_options(Opt {
task_webhook_url: Some(Url::parse(&url3).unwrap()),
..default_settings(db_path.path())
})
.await
.unwrap();
for url in [url1, url2] {
let (value, code) = server.create_webhook(json!({ "url": url })).await;
snapshot!(code, @"201 Created");
snapshot!(json_string!(value, { ".uuid" => "[uuid]", ".url" => "[ignored]" }), @r#"
{
"uuid": "[uuid]",
"isEditable": true,
"url": "[ignored]",
"headers": {}
}
"#);
}
let index = server.index("tamo");
let (response, code) = index
.add_documents_with_custom_metadata(
json!({ "id": 1, "doggo": "bone" }),
None,
Some("test_meta"),
)
.await;
snapshot!(response, @r###"
{
"taskUid": 0,
"indexUid": "tamo",
"status": "enqueued",
"type": "documentAdditionOrUpdate",
"enqueuedAt": "[date]",
"customMetadata": "test_meta"
}
"###);
snapshot!(code, @"202 Accepted");
let mut count1 = 0;
let mut count2 = 0;
let mut count3 = 0;
while count1 == 0 || count2 == 0 || count3 == 0 {
tokio::select! {
msg = receiver1.recv() => {
if let Some(msg) = msg {
count1 += 1;
check_metadata(msg);
}
},
msg = receiver2.recv() => {
if let Some(msg) = msg {
count2 += 1;
check_metadata(msg);
}
},
msg = receiver3.recv() => {
if let Some(msg) = msg {
count3 += 1;
check_metadata(msg);
}
},
}
}
assert_eq!(count1, 1);
assert_eq!(count2, 1);
assert_eq!(count3, 1);
handle1.abort();
handle2.abort();
handle3.abort();
}
fn check_metadata(msg: Vec<u8>) {
let msg = String::from_utf8(msg).unwrap();
let tasks = msg.split('\n');
for task in tasks {
if task.is_empty() {
continue;
}
let task: serde_json::Value = serde_json::from_str(task).unwrap();
snapshot!(common::Value(task), @r###"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": "tamo",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"customMetadata": "test_meta"
}
"###);
}
}

View File

@@ -43,7 +43,7 @@ async fn version_too_old() {
std::fs::write(db_path.join("VERSION"), "1.11.9999").unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.24.0");
snapshot!(err, @"Database version 1.11.9999 is too old for the experimental dumpless upgrade feature. Please generate a dump using the v1.11.9999 and import it in the v1.26.0");
}
#[actix_rt::test]
@@ -58,7 +58,7 @@ async fn version_requires_downgrade() {
std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
snapshot!(err, @"Database version 1.24.1 is higher than the Meilisearch version 1.24.0. Downgrade is not supported");
snapshot!(err, @"Database version 1.26.1 is higher than the Meilisearch version 1.26.0. Downgrade is not supported");
}
#[actix_rt::test]

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -8,7 +8,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"progress": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"stats": {
"totalNbTasks": 1,

View File

@@ -12,7 +12,7 @@ source: crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs
"canceledBy": null,
"details": {
"upgradeFrom": "v1.12.0",
"upgradeTo": "v1.24.0"
"upgradeTo": "v1.26.0"
},
"error": null,
"duration": "[duration]",

View File

@@ -74,12 +74,13 @@ csv = "1.3.1"
candle-core = { version = "0.9.1" }
candle-transformers = { version = "0.9.1" }
candle-nn = { version = "0.9.1" }
tokenizers = { git = "https://github.com/huggingface/tokenizers.git", tag = "v0.15.2", version = "0.15.2", default-features = false, features = [
tokenizers = { version = "0.22.1", default-features = false, features = [
"onig",
] }
hf-hub = { git = "https://github.com/dureuill/hf-hub.git", branch = "rust_tls", default-features = false, features = [
"online",
] }
safetensors = "0.6.2"
tiktoken-rs = "0.7.0"
liquid = "0.26.11"
rhai = { version = "1.22.2", features = [
@@ -100,7 +101,6 @@ bumpalo = "3.18.1"
bumparaw-collections = "0.1.4"
steppe = { version = "0.4", default-features = false }
thread_local = "1.1.9"
allocator-api2 = "0.3.0"
rustc-hash = "2.1.1"
enum-iterator = "2.1.0"
bbqueue = { git = "https://github.com/meilisearch/bbqueue" }

View File

@@ -87,7 +87,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
};
// Otherwise don't directly iterate over children, skip them if we know we will go further
let mut to_skip = n - 1;
let mut to_skip = n;
while to_skip > 0 {
if let Err(e) = SortedDocumentsIterator::update_current(
current_child,
@@ -108,7 +108,7 @@ impl Iterator for SortedDocumentsIterator<'_> {
continue;
} else {
// The current iterator is large enough, so we can forward the call to it.
return inner.nth(to_skip + 1);
return inner.nth(to_skip);
}
}

View File

@@ -1173,6 +1173,7 @@ pub fn extract_embeddings_from_fragments<R: io::Read + io::Seek>(
request_threads,
&doc_alloc,
embedder_stats,
false,
on_embed,
);

View File

@@ -35,6 +35,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
}
impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
@@ -46,7 +47,15 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats }
let failure_modes = EmbedderFailureModes::from_env();
Self {
embedders,
sender,
threads,
possible_embedding_mistakes,
embedder_stats,
failure_modes,
}
}
}
@@ -91,6 +100,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
self.threads,
self.sender,
&context.doc_alloc,
self.failure_modes,
))
}
@@ -267,6 +277,7 @@ pub struct SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort,
failure_modes: EmbedderFailureModes,
}
impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
@@ -279,7 +290,16 @@ impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { settings_delta, embedder_stats, sender, threads, possible_embedding_mistakes }
let failure_modes = EmbedderFailureModes::from_env();
Self {
settings_delta,
embedder_stats,
sender,
threads,
possible_embedding_mistakes,
failure_modes,
}
}
}
@@ -336,6 +356,7 @@ impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor>
self.threads,
self.sender,
&context.doc_alloc,
self.failure_modes,
),
reindex_action,
));
@@ -539,6 +560,7 @@ struct Chunks<'a, 'b, 'extractor> {
enum ChunkType<'a, 'b> {
DocumentTemplate {
document_template: &'a Prompt,
ignore_document_template_failures: bool,
session: EmbedSession<'a, OnEmbeddingDocumentUpdates<'a, 'b>, &'a str>,
},
Fragments {
@@ -559,6 +581,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump,
failure_modes: EmbedderFailureModes,
) -> Self {
let embedder = &runtime.embedder;
let dimensions = embedder.dimensions();
@@ -567,12 +590,14 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let kind = if fragments.is_empty() {
ChunkType::DocumentTemplate {
document_template: &runtime.document_template,
ignore_document_template_failures: failure_modes.ignore_document_template_failures,
session: EmbedSession::new(
&runtime.embedder,
embedder_name,
threads,
doc_alloc,
embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
@@ -589,6 +614,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
threads,
doc_alloc,
embedder_stats,
failure_modes.ignore_embedder_failures,
OnEmbeddingDocumentUpdates {
embedder_id: embedder_info.embedder_id,
sender,
@@ -693,7 +719,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
},
)?;
}
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let old_embedder = settings_delta.old_embedders().get(session.embedder_name());
@@ -702,6 +732,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
} else {
old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template)
};
let extractor =
DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map);
let old_extractor = old_document_template.map(|old_document_template| {
@@ -710,7 +741,15 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let metadata =
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
match extractor.diff_settings(document, &external_docid, old_extractor.as_ref())? {
let extractor_diff = if *ignore_document_template_failures {
let extractor = extractor.ignore_errors();
let old_extractor = old_extractor.map(DocumentTemplateExtractor::ignore_errors);
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
} else {
extractor.diff_settings(document, &external_docid, old_extractor.as_ref())?
};
match extractor_diff {
ExtractorDiff::Removed => {
if old_is_user_provided || full_reindex {
session.on_embed_mut().clear_vectors(docid);
@@ -758,7 +797,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_must_regenerate,
);
match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
@@ -766,18 +809,33 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
new_fields_ids_map,
);
update_autogenerated(
docid,
external_docid,
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)?
if *ignore_document_template_failures {
update_autogenerated(
docid,
external_docid,
[ex.ignore_errors()],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)
} else {
update_autogenerated(
docid,
external_docid,
[ex],
old_document,
new_document,
&external_docid,
old_must_regenerate,
old_is_user_provided,
session,
unused_vectors_distribution,
)
}?
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
@@ -844,23 +902,38 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
);
match &mut self.kind {
ChunkType::DocumentTemplate { document_template, session } => {
ChunkType::DocumentTemplate {
document_template,
ignore_document_template_failures,
session,
} => {
let doc_alloc = session.doc_alloc();
let ex = DocumentTemplateExtractor::new(
document_template,
doc_alloc,
new_fields_ids_map,
);
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
if *ignore_document_template_failures {
insert_autogenerated(
docid,
external_docid,
[ex.ignore_errors()],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
} else {
insert_autogenerated(
docid,
external_docid,
[ex],
new_document,
&external_docid,
session,
unused_vectors_distribution,
)?;
}
}
ChunkType::Fragments { fragments, session } => {
let doc_alloc = session.doc_alloc();
@@ -884,7 +957,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn drain(self, unused_vectors_distribution: &UnusedVectorsDistributionBump) -> Result<()> {
match self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.drain(unused_vectors_distribution)?;
}
ChunkType::Fragments { fragments: _, session } => {
@@ -896,9 +973,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
pub fn embedder_name(&self) -> &'a str {
match &self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
session.embedder_name()
}
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => session.embedder_name(),
ChunkType::Fragments { fragments: _, session } => session.embedder_name(),
}
}
@@ -967,7 +1046,11 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
}
}
match &mut self.kind {
ChunkType::DocumentTemplate { document_template: _, session } => {
ChunkType::DocumentTemplate {
document_template: _,
ignore_document_template_failures: _,
session,
} => {
session.on_embed_mut().process_embeddings(
Metadata { docid, external_docid, extractor_id: 0 },
embeddings,
@@ -1078,3 +1161,41 @@ where
Ok(())
}
#[derive(Clone, Copy, PartialEq, Eq, Default)]
struct EmbedderFailureModes {
pub ignore_document_template_failures: bool,
pub ignore_embedder_failures: bool,
}
impl EmbedderFailureModes {
fn from_env() -> Self {
match std::env::var("MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES") {
Ok(failure_modes) => Self::parse_from_str(
&failure_modes,
"`MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES`",
),
Err(std::env::VarError::NotPresent) => Self::default(),
Err(std::env::VarError::NotUnicode(_)) => panic!(
"`MEILI_EXPERIMENTAL_CONFIG_EMBEDDER_FAILURE_MODES` contains a non-unicode value"
),
}
}
fn parse_from_str(failure_modes: &str, provenance: &'static str) -> Self {
let Self { mut ignore_document_template_failures, mut ignore_embedder_failures } =
Default::default();
for segment in failure_modes.split(',') {
let segment = segment.trim();
match segment {
"ignore_document_template_failures" => {
ignore_document_template_failures = true;
}
"ignore_embedder_failures" => ignore_embedder_failures = true,
"" => continue,
segment => panic!("Unrecognized segment value for {provenance}: {segment}"),
}
}
Self { ignore_document_template_failures, ignore_embedder_failures }
}
}

View File

@@ -1631,8 +1631,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
// Update index settings
let embedding_config_updates = self.update_embedding_configs()?;
self.update_user_defined_searchable_attributes()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
let mut new_inner_settings =
InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
new_inner_settings.recompute_searchables(self.wtxn, self.index)?;
let primary_key_id = self
.index

View File

@@ -42,6 +42,8 @@ const UPGRADE_FUNCTIONS: &[&dyn UpgradeIndex] = &[
&ToTargetNoOp { target: (1, 22, 0) },
&ToTargetNoOp { target: (1, 23, 0) },
&ToTargetNoOp { target: (1, 24, 0) },
&ToTargetNoOp { target: (1, 25, 0) },
&ToTargetNoOp { target: (1, 26, 0) },
// This is the last upgrade function, it will be called when the index is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
@@ -77,6 +79,8 @@ const fn start(from: (u32, u32, u32)) -> Option<usize> {
(1, 22, _) => function_index!(12),
(1, 23, _) => function_index!(13),
(1, 24, _) => function_index!(14),
(1, 25, _) => function_index!(15),
(1, 26, _) => function_index!(16),
// We deliberately don't add a placeholder with (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) here to force manually
// considering dumpless upgrade.
(_major, _minor, _patch) => return None,

View File

@@ -1,9 +1,11 @@
use candle_core::Tensor;
use candle_nn::VarBuilder;
use candle_transformers::models::bert::{BertModel, Config, DTYPE};
use candle_transformers::models::bert::{BertModel, Config as BertConfig, DTYPE};
use candle_transformers::models::modernbert::{Config as ModernConfig, ModernBert};
// FIXME: currently we'll be using the hub to retrieve model, in the future we might want to embed it into Meilisearch itself
use hf_hub::api::sync::Api;
use hf_hub::{Repo, RepoType};
use safetensors::SafeTensors;
use tokenizers::{PaddingParams, Tokenizer};
use super::EmbeddingCache;
@@ -84,14 +86,21 @@ impl Default for EmbedderOptions {
}
}
enum ModelKind {
Bert(BertModel),
Modern(ModernBert),
}
/// Perform embedding of documents and queries
pub struct Embedder {
model: BertModel,
model: ModelKind,
tokenizer: Tokenizer,
options: EmbedderOptions,
dimensions: usize,
pooling: Pooling,
cache: EmbeddingCache,
device: candle_core::Device,
max_len: usize,
}
impl std::fmt::Debug for Embedder {
@@ -101,10 +110,60 @@ impl std::fmt::Debug for Embedder {
.field("tokenizer", &self.tokenizer)
.field("options", &self.options)
.field("pooling", &self.pooling)
.field("device", &self.device)
.field("max_len", &self.max_len)
.finish()
}
}
// some models do not have the "model." prefix in their safetensors weights
fn change_tensor_names(
weights_path: &std::path::Path,
) -> Result<std::path::PathBuf, NewEmbedderError> {
let data = std::fs::read(weights_path)
.map_err(|e| NewEmbedderError::safetensor_weight(candle_core::Error::Io(e)))?;
let tensors = SafeTensors::deserialize(&data)
.map_err(|e| NewEmbedderError::safetensor_weight(candle_core::Error::Msg(e.to_string())))?;
let names = tensors.names();
let has_model_prefix = names.iter().any(|n| n.starts_with("model."));
if has_model_prefix {
return Ok(weights_path.to_path_buf());
}
let fixed_path = weights_path.with_extension("fixed.safetensors");
if fixed_path.exists() {
return Ok(fixed_path);
}
let mut new_tensors = vec![];
for name in names {
let tensor_view = tensors.tensor(name).map_err(|e| {
NewEmbedderError::safetensor_weight(candle_core::Error::Msg(e.to_string()))
})?;
let new_name = format!("model.{}", name);
let data_offset = tensor_view.data();
let shape = tensor_view.shape();
let dtype = tensor_view.dtype();
new_tensors.push((new_name, shape.to_vec(), dtype, data_offset));
}
use safetensors::tensor::TensorView;
let views = new_tensors.iter().map(|(name, shape, dtype, data)| {
(name.as_str(), TensorView::new(*dtype, shape.clone(), data).unwrap())
});
safetensors::serialize_to_file(views, None, &fixed_path)
.map_err(|e| NewEmbedderError::safetensor_weight(candle_core::Error::Msg(e.to_string())))?;
Ok(fixed_path)
}
#[derive(Clone, Copy, serde::Deserialize)]
struct PoolingConfig {
#[serde(default)]
@@ -220,19 +279,42 @@ impl Embedder {
(config, tokenizer, weights, source, pooling)
};
let config = std::fs::read_to_string(&config_filename)
let config_str = std::fs::read_to_string(&config_filename)
.map_err(|inner| NewEmbedderError::open_config(config_filename.clone(), inner))?;
let config: Config = serde_json::from_str(&config).map_err(|inner| {
NewEmbedderError::deserialize_config(
options.model.clone(),
config,
config_filename,
inner,
)
})?;
let cfg_val: serde_json::Value = match serde_json::from_str(&config_str) {
Ok(v) => v,
Err(inner) => {
return Err(NewEmbedderError::deserialize_config(
options.model.clone(),
config_str.clone(),
config_filename.clone(),
inner,
));
}
};
let model_type = cfg_val.get("model_type").and_then(|v| v.as_str()).unwrap_or_default();
let arch_arr = cfg_val.get("architectures").and_then(|v| v.as_array());
let has_arch = |needle: &str| {
model_type.eq_ignore_ascii_case(needle)
|| arch_arr.is_some_and(|arr| {
arr.iter().filter_map(|v| v.as_str()).any(|s| s.to_lowercase().contains(needle))
})
};
let is_modern = has_arch("modernbert");
tracing::debug!(is_modern, model_type, "detected HF architecture");
let mut tokenizer = Tokenizer::from_file(&tokenizer_filename)
.map_err(|inner| NewEmbedderError::open_tokenizer(tokenizer_filename, inner))?;
let weights_filename = if is_modern && weight_source == WeightSource::Safetensors {
change_tensor_names(&weights_filename)?
} else {
weights_filename
};
let vb = match weight_source {
WeightSource::Pytorch => VarBuilder::from_pth(&weights_filename, DTYPE, &device)
.map_err(NewEmbedderError::pytorch_weight)?,
@@ -244,7 +326,31 @@ impl Embedder {
tracing::debug!(model = options.model, weight=?weight_source, pooling=?pooling, "model config");
let model = BertModel::load(vb, &config).map_err(NewEmbedderError::load_model)?;
// max length from config, fallback to 512
let max_len =
cfg_val.get("max_position_embeddings").and_then(|v| v.as_u64()).unwrap_or(512) as usize;
let model = if is_modern {
let config: ModernConfig = serde_json::from_str(&config_str).map_err(|inner| {
NewEmbedderError::deserialize_config(
options.model.clone(),
config_str.clone(),
config_filename.clone(),
inner,
)
})?;
ModelKind::Modern(ModernBert::load(vb, &config).map_err(NewEmbedderError::load_model)?)
} else {
let config: BertConfig = serde_json::from_str(&config_str).map_err(|inner| {
NewEmbedderError::deserialize_config(
options.model.clone(),
config_str.clone(),
config_filename.clone(),
inner,
)
})?;
ModelKind::Bert(BertModel::load(vb, &config).map_err(NewEmbedderError::load_model)?)
};
if let Some(pp) = tokenizer.get_padding_mut() {
pp.strategy = tokenizers::PaddingStrategy::BatchLongest
@@ -263,6 +369,8 @@ impl Embedder {
dimensions: 0,
pooling,
cache: EmbeddingCache::new(cache_cap),
device,
max_len,
};
let embeddings = this
@@ -321,15 +429,29 @@ impl Embedder {
pub fn embed_one(&self, text: &str) -> std::result::Result<Embedding, EmbedError> {
let tokens = self.tokenizer.encode(text, true).map_err(EmbedError::tokenize)?;
let token_ids = tokens.get_ids();
let token_ids = if token_ids.len() > 512 { &token_ids[..512] } else { token_ids };
let token_ids =
Tensor::new(token_ids, &self.model.device).map_err(EmbedError::tensor_shape)?;
if token_ids.len() > self.max_len { &token_ids[..self.max_len] } else { token_ids };
let token_ids = Tensor::new(token_ids, &self.device).map_err(EmbedError::tensor_shape)?;
let token_ids = Tensor::stack(&[token_ids], 0).map_err(EmbedError::tensor_shape)?;
let token_type_ids = token_ids.zeros_like().map_err(EmbedError::tensor_shape)?;
let embeddings = self
.model
.forward(&token_ids, &token_type_ids, None)
.map_err(EmbedError::model_forward)?;
let embeddings = match &self.model {
ModelKind::Bert(model) => {
let token_type_ids = token_ids.zeros_like().map_err(EmbedError::tensor_shape)?;
model
.forward(&token_ids, &token_type_ids, None)
.map_err(EmbedError::model_forward)?
}
ModelKind::Modern(model) => {
let mut mask_vec = tokens.get_attention_mask().to_vec();
if mask_vec.len() > self.max_len {
mask_vec.truncate(self.max_len);
}
let mask = Tensor::new(mask_vec.as_slice(), &self.device)
.map_err(EmbedError::tensor_shape)?;
let mask = Tensor::stack(&[mask], 0).map_err(EmbedError::tensor_shape)?;
model.forward(&token_ids, &mask).map_err(EmbedError::model_forward)?
}
};
let embedding = Self::pooling(embeddings, self.pooling)?;

View File

@@ -91,6 +91,7 @@ struct EmbedderData {
request: RequestData,
response: Response,
configuration_source: ConfigurationSource,
max_retry_duration: std::time::Duration,
}
#[derive(Debug)]
@@ -182,10 +183,15 @@ impl Embedder {
) -> Result<Self, NewEmbedderError> {
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
let timeout = std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_TIMEOUT_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(30);
let client = ureq::AgentBuilder::new()
.max_idle_connections(REQUEST_PARALLELISM * 2)
.max_idle_connections_per_host(REQUEST_PARALLELISM * 2)
.timeout(std::time::Duration::from_secs(30))
.timeout(std::time::Duration::from_secs(timeout))
.build();
let request = RequestData::new(
@@ -196,6 +202,14 @@ impl Embedder {
let response = Response::new(options.response, &request)?;
let max_retry_duration =
std::env::var("MEILI_EXPERIMENTAL_REST_EMBEDDER_MAX_RETRY_DURATION_SECONDS")
.ok()
.map(|p| p.parse().unwrap())
.unwrap_or(60);
let max_retry_duration = std::time::Duration::from_secs(max_retry_duration);
let data = EmbedderData {
client,
bearer,
@@ -204,6 +218,7 @@ impl Embedder {
response,
configuration_source,
headers: options.headers,
max_retry_duration,
};
let dimensions = if let Some(dimensions) = options.dimensions {
@@ -457,7 +472,7 @@ where
}
}?;
let retry_duration = retry_duration.min(std::time::Duration::from_secs(60)); // don't wait more than a minute
let retry_duration = retry_duration.min(data.max_retry_duration); // don't wait more than the max duration
// randomly up to double the retry duration
let retry_duration = retry_duration

View File

@@ -550,9 +550,9 @@ pub struct DeserializePoolingConfig {
#[derive(Debug, thiserror::Error)]
#[error("model `{model_name}` appears to be unsupported{}\n - inner error: {inner}",
if architectures.is_empty() {
"\n - Note: only models with architecture \"BertModel\" are supported.".to_string()
"\n - Note: only models with architecture \"BertModel\" or \"ModernBert\" are supported.".to_string()
} else {
format!("\n - Note: model has declared architectures `{architectures:?}`, only models with architecture `\"BertModel\"` are supported.")
format!("\n - Note: model has declared architectures `{architectures:?}`, only models with architecture `\"BertModel\"` or `\"ModernBert\"` are supported.")
})]
pub struct UnsupportedModel {
pub model_name: String,

View File

@@ -44,6 +44,7 @@ pub struct EmbedSession<'doc, C, I> {
embedder_name: &'doc str,
embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C,
}
@@ -87,6 +88,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads: &'doc ThreadPoolNoAbort,
doc_alloc: &'doc Bump,
embedder_stats: &'doc EmbedderStats,
ignore_embedding_failures: bool,
on_embed: C,
) -> Self {
let capacity = embedder.prompt_count_in_chunk_hint() * embedder.chunk_count_hint();
@@ -99,6 +101,7 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
threads,
embedder_name,
embedder_stats,
ignore_embedding_failures,
on_embed,
}
}
@@ -109,13 +112,12 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
rendered: I,
unused_vectors_distribution: &C::ErrorMetadata,
) -> Result<()> {
if self.inputs.len() < self.inputs.capacity() {
self.inputs.push(rendered);
self.metadata.push(metadata);
return Ok(());
if self.inputs.len() >= self.inputs.capacity() {
self.embed_chunks(unused_vectors_distribution)?;
}
self.embed_chunks(unused_vectors_distribution)
self.inputs.push(rendered);
self.metadata.push(metadata);
Ok(())
}
pub fn drain(mut self, unused_vectors_distribution: &C::ErrorMetadata) -> Result<C> {
@@ -144,24 +146,33 @@ impl<'doc, C: OnEmbed<'doc>, I: Input> EmbedSession<'doc, C, I> {
Ok(())
}
Err(error) => {
// reset metadata and inputs, and send metadata to the error processing.
// send metadata to the error processing.
let doc_alloc = self.metadata.bump();
let metadata = std::mem::replace(
&mut self.metadata,
BVec::with_capacity_in(self.inputs.capacity(), doc_alloc),
);
self.inputs.clear();
return Err(self.on_embed.process_embedding_error(
Err(self.on_embed.process_embedding_error(
error,
self.embedder_name,
unused_vectors_distribution,
metadata,
));
))
}
};
self.inputs.clear();
self.metadata.clear();
res
if self.ignore_embedding_failures {
if let Err(err) = res {
tracing::warn!(
%err,
"ignored error embedding batch of documents due to failure policy"
);
}
Ok(())
} else {
res
}
}
pub(crate) fn embedder_name(&self) -> &'doc str {