Compare commits

...

24 Commits

Author SHA1 Message Date
885b9f07b1 Merge #4949
4949: Fix swedish language support v1.10 r=Kerollmops a=ManyTheFish

# Pull Request

Cherry-picked commits from https://github.com/meilisearch/meilisearch/pull/4945 for v1.10.2


Co-authored-by: ManyTheFish <many@meilisearch.com>
2024-09-23 16:19:53 +00:00
7d0f532dba fix tests 2024-09-23 17:13:27 +02:00
73c1ee65a2 Merge #4951
4951: Update version for the next release (v1.10.2) in Cargo.toml r=dureuill a=ManyTheFish

# Pull Request

Update Meilisearch v1.10.2

Co-authored-by: ManyTheFish <ManyTheFish@users.noreply.github.com>
2024-09-23 14:51:07 +00:00
10a150da93 Update Charabia v0.9.1 2024-09-23 07:55:44 +02:00
a2451b7683 Update version for the next release (v1.10.2) in Cargo.toml 2024-09-23 05:47:44 +00:00
cfa82ab1bb Add Swedish pipeline in all-tokenization feature 2024-09-23 07:30:03 +02:00
3f517dfae6 Merge #4905
4905: Do not fail the whole batch when a single document deletion by filter fails r=dureuill a=irevoire

# Pull Request

## Related issue
Fixes a small bug introduced by https://github.com/meilisearch/meilisearch/pull/4901 where a document deletion by filter could fail a whole batch of document deletion task.

## What does this PR do?
- When a document deletion by filter contains an invalid filter, only fails this task instead of the whole batch
- Adds a big test with multiple document deletions batched together ensuring everything works well


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-09-02 09:00:11 +00:00
11a8e537ed add the snapshots 2024-09-02 10:53:33 +02:00
959aeb0df3 Do not fail the whole batch when a single document deletion by filter fails 2024-09-02 10:34:28 +02:00
4daa27a8e8 Merge #4901
4901: Autobatch document deletion by filter r=dureuill a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/4897

## What does this PR do?
- Enable autobatching of document deletion by filter with:
  - Document deletion by filter
  - Document deletion
  - Document clear
  - Index deletion


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-08-29 15:46:16 +00:00
34ebcd378e autobatch document deletion by filter 2024-08-29 16:50:05 +02:00
5fed4d035a Merge #4899
4899: stop trying to process searches after one minute r=ManyTheFish a=irevoire

# Pull Request

## Related issue
May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350

## What does this PR do?
- If we've been waiting for one whole minute for a search to process, we cancel it
- Ideally we should check if the connection was closed instead but that’s not possible currently: https://github.com/actix/actix-web/issues/3462


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-08-29 07:10:42 +00:00
11bee34bf0 stop trying to process searches after one minute 2024-08-28 19:01:54 +02:00
541a23b17a Merge #4898
4898: Explicitely drop the search permits r=ManyTheFish a=irevoire

# Pull Request

## Related issue
May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350

## What does this PR do?
- Stop spawning a tokio task that is not immediately scheduled and instead explicitly drop the search permit

This should make new search requests to be scheduled quicker than before and reduce the general load on tokio

Co-authored-by: Tamo <tamo@meilisearch.com>
2024-08-28 13:33:38 +00:00
69ab09c149 ensure we never early exit when we have a permit and remove the warning when we implicitely drop a permit 2024-08-28 15:17:10 +02:00
25534cc794 Merge #4895
4895: Update version for the next release (v1.10.1) in Cargo.toml r=irevoire a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: irevoire <irevoire@users.noreply.github.com>
2024-08-28 12:52:29 +00:00
17d0b10765 Update version for the next release (v1.10.1) in Cargo.toml 2024-08-28 14:51:53 +02:00
241746e7f4 add a warning to help us find when we forget to drop explicitely drop a permit 2024-08-28 14:37:55 +02:00
1b90e6ce5f explicitely drop the search permit 2024-08-28 14:29:25 +02:00
c10d06febc Merge #4896
4896: Make sure the index scheduler never stops running r=dureuill a=irevoire

# Pull Request

## Related issue
Fixes #4748 for the v1.10.1

I cherry-picked the commits from https://github.com/meilisearch/meilisearch/pull/4861

Co-authored-by: Tamo <tamo@meilisearch.com>
2024-08-28 07:46:05 +00:00
9eee467226 Merge #4893
4893: Only spawn one search queue in actix-web r=dureuill a=irevoire

# Pull Request

## Related issue
May be related to #4654 and https://github.com/meilisearch/meilisearch-support/issues/350

## What does this PR do?
- We noticed a bug where multiple search queue were spawned instead of one


Co-authored-by: Tamo <tamo@meilisearch.com>
2024-08-27 20:26:59 +00:00
1cc6ac089b ensure the run function doesn't panic even if the tick function does 2024-08-27 18:26:13 +02:00
f85e091cb4 make sure the index scheduler never stops running 2024-08-27 18:26:05 +02:00
99fdccdc7e Only spawn one search queue in actix-web 2024-08-27 18:19:06 +02:00
25 changed files with 783 additions and 214 deletions

38
Cargo.lock generated
View File

@ -471,7 +471,7 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "benchmarks"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"bytes",
@ -652,7 +652,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"time",
@ -933,9 +933,9 @@ dependencies = [
[[package]]
name = "charabia"
version = "0.9.0"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03cd8f290cae94934cdd0103c14c2de9faf2d7d85be0d24d511af2bf1b14119d"
checksum = "55ff52497324e7d168505a16949ae836c14595606fab94687238d2f6c8d4c798"
dependencies = [
"aho-corasick",
"csv",
@ -1622,7 +1622,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"big_s",
@ -1834,7 +1834,7 @@ checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a"
[[package]]
name = "file-store"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"tempfile",
"thiserror",
@ -1856,7 +1856,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"insta",
"nom",
@ -1876,7 +1876,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"criterion",
"serde_json",
@ -2000,7 +2000,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"arbitrary",
"clap",
@ -2552,7 +2552,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
[[package]]
name = "index-scheduler"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"arroy",
@ -2746,7 +2746,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"criterion",
"serde_json",
@ -3365,7 +3365,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"insta",
"md5",
@ -3374,7 +3374,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"actix-cors",
"actix-http",
@ -3463,7 +3463,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@ -3482,7 +3482,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"actix-web",
"anyhow",
@ -3512,7 +3512,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"clap",
@ -3542,7 +3542,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"arroy",
"big_s",
@ -3976,7 +3976,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"big_s",
"serde_json",
@ -6361,7 +6361,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.10.0"
version = "1.10.2"
dependencies = [
"anyhow",
"build-info",

View File

@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.10.0"
version = "1.10.2"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@ -25,8 +25,9 @@ enum AutobatchKind {
primary_key: Option<String>,
},
DocumentEdition,
DocumentDeletion,
DocumentDeletionByFilter,
DocumentDeletion {
by_filter: bool,
},
DocumentClear,
Settings {
allow_index_creation: bool,
@ -65,10 +66,12 @@ impl From<KindWithContent> for AutobatchKind {
..
} => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key },
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion,
KindWithContent::DocumentDeletion { .. } => {
AutobatchKind::DocumentDeletion { by_filter: false }
}
KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear,
KindWithContent::DocumentDeletionByFilter { .. } => {
AutobatchKind::DocumentDeletionByFilter
AutobatchKind::DocumentDeletion { by_filter: true }
}
KindWithContent::SettingsUpdate { allow_index_creation, is_deletion, .. } => {
AutobatchKind::Settings {
@ -105,9 +108,7 @@ pub enum BatchKind {
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
DocumentDeletionByFilter {
id: TaskId,
includes_by_filter: bool,
},
ClearAndSettings {
other: Vec<TaskId>,
@ -205,12 +206,13 @@ impl BatchKind {
allow_index_creation,
),
K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false),
K::DocumentDeletion => {
(Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false)
}
K::DocumentDeletionByFilter => {
(Break(BatchKind::DocumentDeletionByFilter { id: task_id }), false)
}
K::DocumentDeletion { by_filter: includes_by_filter } => (
Continue(BatchKind::DocumentDeletion {
deletion_ids: vec![task_id],
includes_by_filter,
}),
false,
),
K::Settings { allow_index_creation } => (
Continue(BatchKind::Settings { allow_index_creation, settings_ids: vec![task_id] }),
allow_index_creation,
@ -228,7 +230,7 @@ impl BatchKind {
match (self, kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this),
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition) => Break(this),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break(this)
@ -264,7 +266,7 @@ impl BatchKind {
// The index deletion can batch with everything but must stop after
(
BatchKind::DocumentClear { mut ids }
| BatchKind::DocumentDeletion { deletion_ids: mut ids }
| BatchKind::DocumentDeletion { deletion_ids: mut ids, includes_by_filter: _ }
| BatchKind::DocumentOperation { method: _, allow_index_creation: _, primary_key: _, operation_ids: mut ids }
| BatchKind::Settings { allow_index_creation: _, settings_ids: mut ids },
K::IndexDeletion,
@ -284,7 +286,7 @@ impl BatchKind {
(
BatchKind::DocumentClear { mut ids },
K::DocumentClear | K::DocumentDeletion,
K::DocumentClear | K::DocumentDeletion { by_filter: _ },
) => {
ids.push(id);
Continue(BatchKind::DocumentClear { ids })
@ -328,7 +330,7 @@ impl BatchKind {
}
(
BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids },
K::DocumentDeletion,
K::DocumentDeletion { by_filter: false },
) => {
operation_ids.push(id);
@ -339,6 +341,13 @@ impl BatchKind {
operation_ids,
})
}
// We can't batch a document operation with a delete by filter
(
this @ BatchKind::DocumentOperation { .. },
K::DocumentDeletion { by_filter: true },
) => {
Break(this)
}
// but we can't autobatch documents if it's not the same kind
// this match branch MUST be AFTER the previous one
(
@ -357,13 +366,18 @@ impl BatchKind {
operation_ids,
}),
(BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentClear) => {
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: _ }, K::DocumentClear) => {
deletion_ids.push(id);
Continue(BatchKind::DocumentClear { ids: deletion_ids })
}
// we can't autobatch the deletion and import if the document deletion contained a filter
(
this @ BatchKind::DocumentDeletion { deletion_ids: _, includes_by_filter: true },
K::DocumentImport { .. }
) => Break(this),
// we can autobatch the deletion and import if the index already exists
(
BatchKind::DocumentDeletion { mut deletion_ids },
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
K::DocumentImport { method, allow_index_creation, primary_key }
) if index_already_exists => {
deletion_ids.push(id);
@ -377,7 +391,7 @@ impl BatchKind {
}
// we can autobatch the deletion and import if both can't create an index
(
BatchKind::DocumentDeletion { mut deletion_ids },
BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter: false },
K::DocumentImport { method, allow_index_creation, primary_key }
) if !allow_index_creation => {
deletion_ids.push(id);
@ -396,9 +410,9 @@ impl BatchKind {
) => {
Break(this)
}
(BatchKind::DocumentDeletion { mut deletion_ids }, K::DocumentDeletion) => {
(BatchKind::DocumentDeletion { mut deletion_ids, includes_by_filter }, K::DocumentDeletion { by_filter }) => {
deletion_ids.push(id);
Continue(BatchKind::DocumentDeletion { deletion_ids })
Continue(BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: includes_by_filter | by_filter })
}
(this @ BatchKind::DocumentDeletion { .. }, K::Settings { .. }) => Break(this),
@ -412,7 +426,7 @@ impl BatchKind {
}),
(
this @ BatchKind::Settings { .. },
K::DocumentImport { .. } | K::DocumentDeletion,
K::DocumentImport { .. } | K::DocumentDeletion { .. },
) => Break(this),
(
BatchKind::Settings { mut settings_ids, allow_index_creation },
@ -443,7 +457,7 @@ impl BatchKind {
settings_ids,
allow_index_creation,
},
K::DocumentDeletion,
K::DocumentDeletion { .. },
) => {
other.push(id);
Continue(BatchKind::ClearAndSettings {
@ -505,7 +519,7 @@ impl BatchKind {
// this MUST be AFTER the two previous branch
(
this @ BatchKind::SettingsAndDocumentOperation { .. },
K::DocumentDeletion | K::DocumentImport { .. },
K::DocumentDeletion { .. } | K::DocumentImport { .. },
) => Break(this),
(
BatchKind::SettingsAndDocumentOperation { mut settings_ids, method, allow_index_creation,primary_key, operation_ids },
@ -525,8 +539,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::DocumentEdition { .. }
| BatchKind::DocumentDeletionByFilter { .. },
| BatchKind::DocumentEdition { .. },
_,
) => {
unreachable!()
@ -616,6 +629,13 @@ mod tests {
}
}
fn doc_del_fil() -> KindWithContent {
KindWithContent::DocumentDeletionByFilter {
index_uid: String::from("doggo"),
filter_expr: serde_json::json!("cuteness > 100"),
}
}
fn doc_clr() -> KindWithContent {
KindWithContent::DocumentClear { index_uid: String::from("doggo") }
}
@ -676,10 +696,16 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1, 2] }, false))");
// we can autobatch one or multiple DocumentDeletion together
debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), doc_del(), doc_del()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: false }, false))");
// we can autobatch one or multiple DocumentDeletionByFilter together
debug_snapshot!(autobatch_from(true, None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), doc_del_fil(), doc_del_fil()]), @"Some((DocumentDeletion { deletion_ids: [0, 1, 2], includes_by_filter: true }, false))");
// we can autobatch one or multiple Settings together
debug_snapshot!(autobatch_from(true, None, [settings(true)]), @"Some((Settings { allow_index_creation: true, settings_ids: [0] }, true))");
@ -722,25 +748,63 @@ mod tests {
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###);
// But we can't autobatch document addition with document deletion by filter
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del_fil()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
// And the other way around
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del_fil(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
}
#[test]
fn simple_document_operation_dont_autobatch_with_other() {
// addition, updates and deletion can't batch together
// addition, updates and deletion by filter can't batch together
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del_fil()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_create()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_create()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_update()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_update()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), idx_swap()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_swap()]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: true }, false))");
}
#[test]
@ -807,6 +871,7 @@ mod tests {
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))");
@ -816,6 +881,7 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_del_fil()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), doc_clr()]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(true)]), @"Some((IndexDeletion { ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [idx_del(), settings(false)]), @"Some((IndexDeletion { ids: [0] }, false))");
@ -827,6 +893,7 @@ mod tests {
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(true, None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))");
debug_snapshot!(autobatch_from(true, None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
@ -836,6 +903,7 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_imp(UpdateDocuments, false, None), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_del_fil(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [doc_clr(), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
debug_snapshot!(autobatch_from(false,None, [settings(true), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, true))");
debug_snapshot!(autobatch_from(false,None, [settings(false), idx_del()]), @"Some((IndexDeletion { ids: [0, 1] }, false))");
@ -901,10 +969,10 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [doc_imp(ReplaceDocuments, false, None), settings(true)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
// batch deletion and addition
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0], includes_by_filter: false }, false))");
}
#[test]

View File

@ -110,9 +110,9 @@ pub(crate) enum IndexOperation {
index_uid: String,
task: Task,
},
IndexDocumentDeletionByFilter {
DocumentDeletion {
index_uid: String,
task: Task,
tasks: Vec<Task>,
},
DocumentClear {
index_uid: String,
@ -165,11 +165,11 @@ impl Batch {
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentDeletion { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::DocumentEdition { task, .. }
| IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
IndexOperation::DocumentEdition { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
IndexOperation::SettingsAndDocumentOperation {
@ -234,7 +234,7 @@ impl IndexOperation {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentEdition { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
@ -252,8 +252,8 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentEdition { .. } => {
f.write_str("IndexOperation::DocumentEdition")
}
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
IndexOperation::DocumentDeletion { .. } => {
f.write_str("IndexOperation::DocumentDeletion")
}
IndexOperation::DocumentClear { .. } => f.write_str("IndexOperation::DocumentClear"),
IndexOperation::Settings { .. } => f.write_str("IndexOperation::Settings"),
@ -289,21 +289,6 @@ impl IndexScheduler {
},
must_create_index,
})),
BatchKind::DocumentDeletionByFilter { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
Ok(Some(Batch::IndexOperation {
op: IndexOperation::IndexDocumentDeletionByFilter {
index_uid: index_uid.clone(),
task,
},
must_create_index: false,
}))
}
_ => unreachable!(),
}
}
BatchKind::DocumentEdition { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
@ -366,30 +351,11 @@ impl IndexScheduler {
must_create_index,
}))
}
BatchKind::DocumentDeletion { deletion_ids } => {
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
let mut operations = Vec::with_capacity(tasks.len());
let mut documents_counts = Vec::with_capacity(tasks.len());
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
operations.push(DocumentOperation::Delete(documents_ids.clone()));
documents_counts.push(documents_ids.len() as u64);
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentOperation {
index_uid,
primary_key: None,
method: IndexDocumentsMethod::ReplaceDocuments,
documents_counts,
operations,
tasks,
},
op: IndexOperation::DocumentDeletion { index_uid, tasks },
must_create_index,
}))
}
@ -1439,7 +1405,7 @@ impl IndexScheduler {
{
(original_filter, context, function)
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
// In the case of a `documentEdition` the details MUST be set
unreachable!();
};
@ -1469,52 +1435,102 @@ impl IndexScheduler {
Ok(vec![task])
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
&task.kind
{
filter_expr
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(
index_wtxn,
filter,
self.index_mapper.indexer_config(),
self.must_stop_processing.clone(),
index,
);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
IndexOperation::DocumentDeletion { mut tasks, index_uid: _ } => {
let mut to_delete = RoaringBitmap::new();
let external_documents_ids = index.external_documents_ids();
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
for task in tasks.iter_mut() {
let before = to_delete.len();
task.status = Status::Succeeded;
match &task.kind {
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
for id in documents_ids {
if let Some(id) = external_documents_ids.get(index_wtxn, id)? {
to_delete.insert(id);
}
}
let will_be_removed = to_delete.len() - before;
task.details = Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
deleted_documents: Some(will_be_removed),
});
}
KindWithContent::DocumentDeletionByFilter { index_uid: _, filter_expr } => {
let before = to_delete.len();
let filter = match Filter::from_json(filter_expr) {
Ok(filter) => filter,
Err(err) => {
// theorically, this should be catched by deserr before reaching the index-scheduler and cannot happens
task.status = Status::Failed;
task.error = match err {
milli::Error::UserError(
milli::UserError::InvalidFilterExpression { .. },
) => Some(
Error::from(err)
.with_custom_error_code(Code::InvalidDocumentFilter)
.into(),
),
e => Some(e.into()),
};
None
}
};
if let Some(filter) = filter {
let candidates =
filter.evaluate(index_wtxn, index).map_err(|err| match err {
milli::Error::UserError(
milli::UserError::InvalidFilter(_),
) => Error::from(err)
.with_custom_error_code(Code::InvalidDocumentFilter),
e => e.into(),
});
match candidates {
Ok(candidates) => to_delete |= candidates,
Err(err) => {
task.status = Status::Failed;
task.error = Some(err.into());
}
};
}
let will_be_removed = to_delete.len() - before;
if let Some(Details::DocumentDeletionByFilter {
original_filter: _,
deleted_documents,
}) = &mut task.details
{
*deleted_documents = Some(will_be_removed);
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!()
}
}
_ => unreachable!(),
}
}
Ok(vec![task])
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let must_stop_processing = self.must_stop_processing.clone();
let mut builder = milli::update::IndexDocuments::new(
index_wtxn,
index,
self.index_mapper.indexer_config(),
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
let (new_builder, _count) =
builder.remove_documents_from_db_no_batch(&to_delete)?;
builder = new_builder;
let _ = builder.execute()?;
Ok(tasks)
}
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config();
@ -1718,46 +1734,6 @@ impl IndexScheduler {
}
}
fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value,
indexer_config: &IndexerConfig,
must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?;
Ok(if let Some(filter) = filter {
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
}
e => e.into(),
})?;
let config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = milli::update::IndexDocuments::new(
wtxn,
index,
indexer_config,
config,
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
)?;
let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
builder = new_builder;
let _ = builder.execute()?;
count
} else {
0
})
}
fn edit_documents_by_function<'a>(
wtxn: &mut RwTxn<'a>,
filter: &Option<serde_json::Value>,

View File

@ -35,6 +35,7 @@ pub type TaskId = u32;
use std::collections::{BTreeMap, HashMap};
use std::io::{self, BufReader, Read};
use std::ops::{Bound, RangeBounds};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering::{self, Relaxed};
use std::sync::atomic::{AtomicBool, AtomicU32};
@ -612,19 +613,24 @@ impl IndexScheduler {
#[cfg(test)]
run.breakpoint(Breakpoint::Init);
run.wake_up.wait();
run.wake_up.wait_timeout(std::time::Duration::from_secs(60));
loop {
match run.tick() {
Ok(TickOutcome::TickAgain(_)) => (),
Ok(TickOutcome::WaitForSignal) => run.wake_up.wait(),
Err(e) => {
let ret = catch_unwind(AssertUnwindSafe(|| run.tick()));
match ret {
Ok(Ok(TickOutcome::TickAgain(_))) => (),
Ok(Ok(TickOutcome::WaitForSignal)) => run.wake_up.wait(),
Ok(Err(e)) => {
tracing::error!("{e}");
// Wait one second when an irrecoverable error occurs.
if !e.is_recoverable() {
std::thread::sleep(Duration::from_secs(1));
}
}
Err(_panic) => {
tracing::error!("Internal error: Unexpected panic in the `IndexScheduler::run` method.");
}
}
}
})
@ -1758,6 +1764,7 @@ mod tests {
use crossbeam::channel::RecvTimeoutError;
use file_store::File;
use insta::assert_json_snapshot;
use maplit::btreeset;
use meili_snap::{json_string, snapshot};
use meilisearch_auth::AuthFilter;
use meilisearch_types::document_formats::DocumentFormatError;
@ -2547,6 +2554,117 @@ mod tests {
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn fail_in_process_batch_for_document_deletion() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
use meilisearch_types::settings::{Settings, Unchecked};
let mut new_settings: Box<Settings<Unchecked>> = Box::default();
new_settings.filterable_attributes = Setting::Set(btreeset!(S("catto")));
index_scheduler
.register(
KindWithContent::SettingsUpdate {
index_uid: S("doggos"),
new_settings,
is_deletion: false,
allow_index_creation: true,
},
None,
false,
)
.unwrap();
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
},
None,
false,
)
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_setting_and_document_addition");
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_settings");
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_adding_the_documents");
index_scheduler
.register(
KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1")],
},
None,
false,
)
.unwrap();
// This one should not be catched by Meilisearch but it's still nice to handle it because if one day we break the filters it could happens
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!(true),
},
None,
false,
)
.unwrap();
// Should fail because the ids are not filterable
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!("id = 2"),
},
None,
false,
)
.unwrap();
index_scheduler
.register(
KindWithContent::DocumentDeletionByFilter {
index_uid: S("doggos"),
filter_expr: serde_json::json!("catto EXISTS"),
},
None,
false,
)
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_document_deletions");
// Everything should be batched together
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_removing_the_documents");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents_remaining_should_only_be_bork");
}
#[test]
fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);

View File

@ -0,0 +1,44 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,43 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [1,]
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,43 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [1,]
succeeded [0,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -0,0 +1,56 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
2 {uid: 2, status: succeeded, details: { received_document_ids: 1, deleted_documents: Some(1) }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1"] }}
3 {uid: 3, status: failed, error: ResponseError { code: 200, message: "Invalid type for filter subexpression: expected: String, Array, found: true.", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: true, deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }}
4 {uid: 4, status: failed, error: ResponseError { code: 200, message: "Attribute `id` is not filterable. Available filterable attributes are: `catto`.\n1:3 id = 2", error_code: "invalid_document_filter", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#invalid_document_filter" }, details: { original_filter: "id = 2", deleted_documents: Some(0) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }}
5 {uid: 5, status: succeeded, details: { original_filter: "catto EXISTS", deleted_documents: Some(1) }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("catto EXISTS") }}
----------------------------------------------------------------------
### Status:
enqueued []
succeeded [0,1,2,5,]
failed [3,4,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"documentDeletion" [2,3,4,5,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,2,3,4,5,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,3,4,5,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,3,4,5,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,9 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"id": 3,
"doggo": "bork"
}
]

View File

@ -0,0 +1,53 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { received_documents: 3, indexed_documents: Some(3) }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_document_ids: 1, deleted_documents: None }, kind: DocumentDeletion { index_uid: "doggos", documents_ids: ["1"] }}
3 {uid: 3, status: enqueued, details: { original_filter: true, deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: Bool(true) }}
4 {uid: 4, status: enqueued, details: { original_filter: "id = 2", deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("id = 2") }}
5 {uid: 5, status: enqueued, details: { original_filter: "catto EXISTS", deleted_documents: None }, kind: DocumentDeletionByFilter { index_uid: "doggos", filter_expr: String("catto EXISTS") }}
----------------------------------------------------------------------
### Status:
enqueued [2,3,4,5,]
succeeded [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"documentDeletion" [2,3,4,5,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,2,3,4,5,]
----------------------------------------------------------------------
### Index Mapper:
doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} }
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
[timestamp] [2,]
[timestamp] [3,]
[timestamp] [4,]
[timestamp] [5,]
----------------------------------------------------------------------
### Started At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Finished At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### File Store:
----------------------------------------------------------------------

View File

@ -0,0 +1,39 @@
---
source: index-scheduler/src/lib.rs
---
### Autobatching Enabled = true
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: Set({"catto"}), sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: NotSet, search_cutoff_ms: NotSet, localized_attributes: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 3, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggos", primary_key: Some("id"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 3, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]
----------------------------------------------------------------------
### Kind:
"documentAdditionOrUpdate" [1,]
"settingsUpdate" [0,]
----------------------------------------------------------------------
### Index Tasks:
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
----------------------------------------------------------------------
### Canceled By:
----------------------------------------------------------------------
### Enqueued At:
[timestamp] [0,]
[timestamp] [1,]
----------------------------------------------------------------------
### Started At:
----------------------------------------------------------------------
### Finished At:
----------------------------------------------------------------------
### File Store:
00000000-0000-0000-0000-000000000000
----------------------------------------------------------------------

View File

@ -115,7 +115,8 @@ make_locale! {
Slk,
Cat,
Tgl,
Hye
Hye,
Zho
}
impl std::error::Error for LocaleFormatError {}

View File

@ -13,11 +13,10 @@ pub mod search_queue;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::num::NonZeroUsize;
use std::path::Path;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::{self, available_parallelism};
use std::thread;
use std::time::Duration;
use actix_cors::Cors;
@ -118,6 +117,7 @@ pub type LogStderrType = tracing_subscriber::filter::Filtered<
pub fn create_app(
index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: Opt,
logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
@ -137,6 +137,7 @@ pub fn create_app(
s,
index_scheduler.clone(),
auth_controller.clone(),
search_queue.clone(),
&opt,
logs,
analytics.clone(),
@ -469,19 +470,16 @@ pub fn configure_data(
config: &mut web::ServiceConfig,
index_scheduler: Data<IndexScheduler>,
auth: Data<AuthController>,
search_queue: Data<SearchQueue>,
opt: &Opt,
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) {
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize;
config
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::new(search_queue))
.app_data(search_queue)
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr))

View File

@ -1,8 +1,10 @@
use std::env;
use std::io::{stderr, LineWriter, Write};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use std::thread::available_parallelism;
use actix_web::http::KeepAlive;
use actix_web::web::Data;
@ -11,6 +13,7 @@ use index_scheduler::IndexScheduler;
use is_terminal::IsTerminal;
use meilisearch::analytics::Analytics;
use meilisearch::option::LogMode;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{
analytics, create_app, setup_meilisearch, LogRouteHandle, LogRouteType, LogStderrHandle,
LogStderrType, Opt, SubscriberForSecondLayer,
@ -148,11 +151,17 @@ async fn run_http(
let opt_clone = opt.clone();
let index_scheduler = Data::from(index_scheduler);
let auth_controller = Data::from(auth_controller);
let search_queue = SearchQueue::new(
opt.experimental_search_queue_size,
available_parallelism().unwrap_or(NonZeroUsize::new(2).unwrap()),
);
let search_queue = Data::new(search_queue);
let http_server = HttpServer::new(move || {
create_app(
index_scheduler.clone(),
auth_controller.clone(),
search_queue.clone(),
opt.clone(),
logs.clone(),
analytics.clone(),

View File

@ -81,7 +81,7 @@ pub async fn search(
let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features();
let search_kind = search_kind(&search_query, &index_scheduler, &index, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_facet_search(
&index,
@ -93,7 +93,9 @@ pub async fn search(
locales,
)
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);

View File

@ -233,11 +233,13 @@ pub async fn search_with_url_query(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vector = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vector, index_scheduler.features())
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
@ -276,11 +278,13 @@ pub async fn search_with_post(
let search_kind = search_kind(&query, index_scheduler.get_ref(), &index, features)?;
let retrieve_vectors = RetrieveVectors::new(query.retrieve_vectors, features)?;
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let search_result = tokio::task::spawn_blocking(move || {
perform_search(&index, query, search_kind, retrieve_vectors, index_scheduler.features())
})
.await?;
.await;
permit.drop().await;
let search_result = search_result?;
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
if search_result.degraded {

View File

@ -39,7 +39,7 @@ pub async fn multi_search_with_post(
) -> Result<HttpResponse, ResponseError> {
// Since we don't want to process half of the search requests and then get a permit refused
// we're going to get one permit for the whole duration of the multi-search request.
let _permit = search_queue.try_get_search_permit().await?;
let permit = search_queue.try_get_search_permit().await?;
let federated_search = params.into_inner();
@ -81,6 +81,7 @@ pub async fn multi_search_with_post(
perform_federated_search(&index_scheduler, queries, federation, features)
})
.await;
permit.drop().await;
if let Ok(Ok(_)) = search_result {
multi_aggregate.succeed();
@ -143,6 +144,7 @@ pub async fn multi_search_with_post(
Ok(search_results)
}
.await;
permit.drop().await;
if search_results.is_ok() {
multi_aggregate.succeed();

View File

@ -18,6 +18,7 @@
//! And should drop the Permit only once you have freed all the RAM consumed by the method.
use std::num::NonZeroUsize;
use std::time::Duration;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
@ -29,16 +30,31 @@ use crate::error::MeilisearchHttpError;
pub struct SearchQueue {
sender: mpsc::Sender<oneshot::Sender<Permit>>,
capacity: usize,
/// If we have waited longer than this to get a permit, we should abort the search request entirely.
/// The client probably already closed the connection, but we have no way to find out.
time_to_abort: Duration,
}
/// You should only run search requests while holding this permit.
/// Once it's dropped, a new search request will be able to process.
/// You should always try to drop the permit yourself calling the `drop` async method on it.
#[derive(Debug)]
pub struct Permit {
sender: mpsc::Sender<()>,
}
impl Permit {
/// Drop the permit giving back on permit to the search queue.
pub async fn drop(self) {
// if the channel is closed then the whole instance is down
let _ = self.sender.send(()).await;
}
}
impl Drop for Permit {
/// The implicit drop implementation can still be called in multiple cases:
/// - We forgot to call the explicit one somewhere => this should be fixed on our side asap
/// - The future is cancelled while running and the permit dropped with it
fn drop(&mut self) {
let sender = self.sender.clone();
// if the channel is closed then the whole instance is down
@ -53,7 +69,11 @@ impl SearchQueue {
let (sender, receiver) = mpsc::channel(1);
tokio::task::spawn(Self::run(capacity, paralellism, receiver));
Self { sender, capacity }
Self { sender, capacity, time_to_abort: Duration::from_secs(60) }
}
pub fn with_time_to_abort(self, time_to_abort: Duration) -> Self {
Self { time_to_abort, ..self }
}
/// This function is the main loop, it's in charge on scheduling which search request should execute first and
@ -119,9 +139,23 @@ impl SearchQueue {
/// Returns a search `Permit`.
/// It should be dropped as soon as you've freed all the RAM associated with the search request being processed.
pub async fn try_get_search_permit(&self) -> Result<Permit, MeilisearchHttpError> {
let now = std::time::Instant::now();
let (sender, receiver) = oneshot::channel();
self.sender.send(sender).await.map_err(|_| MeilisearchHttpError::SearchLimiterIsDown)?;
receiver.await.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))
let permit = receiver
.await
.map_err(|_| MeilisearchHttpError::TooManySearchRequests(self.capacity))?;
// If we've been for more than one minute to get a search permit, it's better to simply
// abort the search request than spending time processing something were the client
// most certainly exited or got a timeout a long time ago.
// We may find a better solution in https://github.com/actix/actix-web/issues/3462.
if now.elapsed() > self.time_to_abort {
permit.drop().await;
Err(MeilisearchHttpError::TooManySearchRequests(self.capacity))
} else {
Ok(permit)
}
}
/// Returns `Ok(())` if everything seems normal.

View File

@ -1,7 +1,9 @@
#![allow(dead_code)]
use std::num::NonZeroUsize;
use std::path::Path;
use std::str::FromStr as _;
use std::sync::Arc;
use std::time::Duration;
use actix_http::body::MessageBody;
@ -10,6 +12,7 @@ use actix_web::http::StatusCode;
use byte_unit::{Byte, Unit};
use clap::Parser;
use meilisearch::option::{IndexerOpts, MaxMemory, MaxThreads, Opt};
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer};
use once_cell::sync::Lazy;
use tempfile::TempDir;
@ -32,6 +35,12 @@ pub struct Server {
pub static TEST_TEMP_DIR: Lazy<TempDir> = Lazy::new(|| TempDir::new().unwrap());
impl Server {
fn new_search_queue(options: &Opt) -> Arc<SearchQueue> {
let search_queue =
SearchQueue::new(options.experimental_search_queue_size, NonZeroUsize::new(1).unwrap());
Arc::new(search_queue)
}
pub async fn new() -> Self {
let dir = TempDir::new().unwrap();
@ -44,7 +53,13 @@ impl Server {
let options = default_settings(dir.path());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Server { service, _dir: Some(dir) }
}
@ -59,7 +74,13 @@ impl Server {
options.master_key = Some("MASTER_KEY".to_string());
let (index_scheduler, auth) = setup_meilisearch(&options).unwrap();
let service = Service { index_scheduler, auth, options, api_key: None };
let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Server { service, _dir: Some(dir) }
}
@ -72,7 +93,13 @@ impl Server {
pub async fn new_with_options(options: Opt) -> Result<Self, anyhow::Error> {
let (index_scheduler, auth) = setup_meilisearch(&options)?;
let service = Service { index_scheduler, auth, options, api_key: None };
let service = Service {
index_scheduler,
auth,
search_queue: Self::new_search_queue(&options),
options,
api_key: None,
};
Ok(Server { service, _dir: None })
}
@ -100,6 +127,7 @@ impl Server {
actix_web::test::init_service(create_app(
self.service.index_scheduler.clone().into(),
self.service.auth.clone().into(),
self.service.search_queue.clone().into(),
self.service.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.service.options),

View File

@ -5,6 +5,7 @@ use actix_web::http::StatusCode;
use actix_web::test;
use actix_web::test::TestRequest;
use index_scheduler::IndexScheduler;
use meilisearch::search_queue::SearchQueue;
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use meilisearch_auth::AuthController;
use tracing::level_filters::LevelFilter;
@ -16,6 +17,7 @@ use crate::common::Value;
pub struct Service {
pub index_scheduler: Arc<IndexScheduler>,
pub auth: Arc<AuthController>,
pub search_queue: Arc<SearchQueue>,
pub options: Opt,
pub api_key: Option<String>,
}
@ -123,6 +125,7 @@ impl Service {
let app = test::init_service(create_app(
self.index_scheduler.clone().into(),
self.auth.clone().into(),
self.search_queue.clone().into(),
self.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.options),

View File

@ -44,6 +44,7 @@ async fn basic_test_log_stream_route() {
let app = actix_web::test::init_service(create_app(
server.service.index_scheduler.clone().into(),
server.service.auth.clone().into(),
server.service.search_queue.clone().into(),
server.service.options.clone(),
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&server.service.options),

View File

@ -1355,7 +1355,7 @@ async fn invalid_locales() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
{
"message": "Unknown value `invalid` at `.locales[0]`: expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`",
"message": "Unknown value `invalid` at `.locales[0]`: expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`, `zho`",
"code": "invalid_search_locales",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_search_locales"
@ -1368,7 +1368,7 @@ async fn invalid_locales() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###"
{
"message": "Invalid value in parameter `locales`: Unsupported locale `invalid`, expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`",
"message": "Invalid value in parameter `locales`: Unsupported locale `invalid`, expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`, `zho`",
"code": "invalid_search_locales",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_search_locales"
@ -1390,7 +1390,7 @@ async fn invalid_localized_attributes_rules() {
.await;
snapshot!(response, @r###"
{
"message": "Unknown value `japan` at `.localizedAttributes[0].locales[0]`: expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`",
"message": "Unknown value `japan` at `.localizedAttributes[0].locales[0]`: expected one of `epo`, `eng`, `rus`, `cmn`, `spa`, `por`, `ita`, `ben`, `fra`, `deu`, `ukr`, `kat`, `ara`, `hin`, `jpn`, `heb`, `yid`, `pol`, `amh`, `jav`, `kor`, `nob`, `dan`, `swe`, `fin`, `tur`, `nld`, `hun`, `ces`, `ell`, `bul`, `bel`, `mar`, `kan`, `ron`, `slv`, `hrv`, `srp`, `mkd`, `lit`, `lav`, `est`, `tam`, `vie`, `urd`, `tha`, `guj`, `uzb`, `pan`, `aze`, `ind`, `tel`, `pes`, `mal`, `ori`, `mya`, `nep`, `sin`, `khm`, `tuk`, `aka`, `zul`, `sna`, `afr`, `lat`, `slk`, `cat`, `tgl`, `hye`, `zho`",
"code": "invalid_settings_localized_attributes",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_settings_localized_attributes"

View File

@ -37,6 +37,43 @@ async fn search_queue_register() {
.unwrap();
}
#[actix_rt::test]
async fn search_queue_register_with_explicit_drop() {
let queue = SearchQueue::new(4, NonZeroUsize::new(2).unwrap());
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let _permit2 = queue.try_get_search_permit().await.unwrap();
// If we free one spot we should be able to register one new search
permit1.drop().await;
let permit3 = queue.try_get_search_permit().await.unwrap();
// And again
permit3.drop().await;
let _permit4 = queue.try_get_search_permit().await.unwrap();
}
#[actix_rt::test]
async fn search_queue_register_with_time_to_abort() {
let queue = Arc::new(
SearchQueue::new(1, NonZeroUsize::new(1).unwrap())
.with_time_to_abort(Duration::from_secs(1)),
);
// First, use all the cores
let permit1 = queue.try_get_search_permit().await.unwrap();
let q = queue.clone();
let permit2 = tokio::task::spawn(async move { q.try_get_search_permit().await });
tokio::time::sleep(Duration::from_secs(1)).await;
permit1.drop().await;
let ret = permit2.await.unwrap();
snapshot!(ret.unwrap_err(), @"Too many search requests running at the same time: 1. Retry after 10s.");
}
#[actix_rt::test]
async fn wait_till_cores_are_available() {
let queue = Arc::new(SearchQueue::new(4, NonZeroUsize::new(1).unwrap()));

View File

@ -17,7 +17,7 @@ bincode = "1.3.3"
bstr = "1.9.1"
bytemuck = { version = "1.16.1", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
charabia = { version = "0.9.0", default-features = false }
charabia = { version = "0.9.1", default-features = false }
concat-arrays = "0.1.2"
crossbeam-channel = "0.5.13"
deserr = "0.6.2"
@ -106,6 +106,7 @@ all-tokenizations = [
"charabia/greek",
"charabia/khmer",
"charabia/vietnamese",
"charabia/swedish-recomposition",
]
# Use POSIX semaphores instead of SysV semaphores in LMDB