mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-04 03:36:30 +00:00
Merge pull request #5784 from meilisearch/sharding-split-docs
Sharding and EE license
This commit is contained in:
8
Cargo.lock
generated
8
Cargo.lock
generated
@ -3746,6 +3746,7 @@ dependencies = [
|
||||
"actix-web-lab",
|
||||
"anyhow",
|
||||
"async-openai",
|
||||
"backoff",
|
||||
"brotli",
|
||||
"bstr",
|
||||
"build-info",
|
||||
@ -3989,6 +3990,7 @@ dependencies = [
|
||||
"time",
|
||||
"tokenizers",
|
||||
"tracing",
|
||||
"twox-hash",
|
||||
"ureq",
|
||||
"url",
|
||||
"utoipa",
|
||||
@ -6442,6 +6444,12 @@ version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
|
||||
|
||||
[[package]]
|
||||
name = "twox-hash"
|
||||
version = "2.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56"
|
||||
|
||||
[[package]]
|
||||
name = "typeid"
|
||||
version = "1.0.3"
|
||||
|
8
LICENSE
8
LICENSE
@ -19,3 +19,11 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
|
||||
---
|
||||
|
||||
🔒 Meilisearch Enterprise Edition (EE)
|
||||
|
||||
Certain parts of this codebase are not licensed under the MIT license and governed by the Business Source License 1.1.
|
||||
|
||||
See the LICENSE-EE file for details.
|
||||
|
67
LICENSE-EE
Normal file
67
LICENSE-EE
Normal file
@ -0,0 +1,67 @@
|
||||
Business Source License 1.1 – Adapted for Meili SAS
|
||||
This license is based on the Business Source License version 1.1, as published by MariaDB Corporation Ab.
|
||||
|
||||
Parameters
|
||||
|
||||
Licensor: Meili SAS
|
||||
|
||||
Licensed Work: Any file explicitly marked as “Enterprise Edition (EE)” or “governed by the Business Source License” residing in enterprise_editions modules/folders.
|
||||
|
||||
Additional Use Grant:
|
||||
You may use, modify, and distribute the Licensed Work for non-production purposes only, such as testing, development, or evaluation.
|
||||
|
||||
Production use of the Licensed Work requires a commercial license agreement with Meilisearch. Contact bonjour@meilisearch.com for licensing.
|
||||
|
||||
Change License: MIT
|
||||
|
||||
Change Date: Four years from the date the Licensed Work is published.
|
||||
|
||||
This License does not apply to any code outside of the Licensed Work, which remains under the MIT license.
|
||||
|
||||
For information about alternative licensing arrangements for the Licensed Work,
|
||||
please contact bonjour@meilisearch.com or sales@meilisearch.com.
|
||||
|
||||
Notice
|
||||
|
||||
Business Source License 1.1
|
||||
|
||||
Terms
|
||||
|
||||
The Licensor hereby grants you the right to copy, modify, create derivative
|
||||
works, redistribute, and make non-production use of the Licensed Work. The
|
||||
Licensor may make an Additional Use Grant, above, permitting limited production use.
|
||||
|
||||
Effective on the Change Date, or the fourth anniversary of the first publicly
|
||||
available distribution of a specific version of the Licensed Work under this
|
||||
License, whichever comes first, the Licensor hereby grants you rights under
|
||||
the terms of the Change License, and the rights granted in the paragraph
|
||||
above terminate.
|
||||
|
||||
If your use of the Licensed Work does not comply with the requirements
|
||||
currently in effect as described in this License, you must purchase a
|
||||
commercial license from the Licensor, its affiliated entities, or authorized
|
||||
resellers, or you must refrain from using the Licensed Work.
|
||||
|
||||
All copies of the original and modified Licensed Work, and derivative works
|
||||
of the Licensed Work, are subject to this License. This License applies
|
||||
separately for each version of the Licensed Work and the Change Date may vary
|
||||
for each version of the Licensed Work released by Licensor.
|
||||
|
||||
You must conspicuously display this License on each original or modified copy
|
||||
of the Licensed Work. If you receive the Licensed Work in original or
|
||||
modified form from a third party, the terms and conditions set forth in this
|
||||
License apply to your use of that work.
|
||||
|
||||
Any use of the Licensed Work in violation of this License will automatically
|
||||
terminate your rights under this License for the current and all other
|
||||
versions of the Licensed Work.
|
||||
|
||||
This License does not grant you any right in any trademark or logo of
|
||||
Licensor or its affiliates (provided that you may use a trademark or logo of
|
||||
Licensor as expressly required by this License).
|
||||
|
||||
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
|
||||
AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
|
||||
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
|
||||
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
|
||||
TITLE.
|
20
README.md
20
README.md
@ -89,6 +89,26 @@ We also offer a wide range of dedicated guides to all Meilisearch features, such
|
||||
|
||||
Finally, for more in-depth information, refer to our articles explaining fundamental Meilisearch concepts such as [documents](https://www.meilisearch.com/docs/learn/core_concepts/documents?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=advanced) and [indexes](https://www.meilisearch.com/docs/learn/core_concepts/indexes?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=advanced).
|
||||
|
||||
## 🧾 Editions & Licensing
|
||||
|
||||
Meilisearch is available in two editions:
|
||||
|
||||
### 🧪 Community Edition (CE)
|
||||
|
||||
- Fully open source under the [MIT license](./LICENSE)
|
||||
- Core search engine with fast and relevant full-text, semantic or hybrid search
|
||||
- Free to use for anyone, including commercial usage
|
||||
|
||||
### 🏢 Enterprise Edition (EE)
|
||||
|
||||
- Includes advanced features such as:
|
||||
- Sharding
|
||||
- Governed by a [commercial license](./LICENSE-EE) or the [Business Source License 1.1](https://mariadb.com/bsl11)
|
||||
- Not allowed in production without a commercial agreement with Meilisearch.
|
||||
- You may use, modify, and distribute the Licensed Work for non-production purposes only, such as testing, development, or evaluation.
|
||||
|
||||
Want access to Enterprise features? → Contact us at [sales@meilisearch.com](maito:sales@meilisearch.com).
|
||||
|
||||
## 📊 Telemetry
|
||||
|
||||
Meilisearch collects **anonymized** user data to help us improve our product. You can [deactivate this](https://www.meilisearch.com/docs/learn/what_is_meilisearch/telemetry?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=telemetry#how-to-disable-data-collection) whenever you want.
|
||||
|
@ -154,6 +154,7 @@ fn indexing_songs_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -221,6 +222,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -266,6 +268,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -335,6 +338,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -412,6 +416,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -457,6 +462,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -498,6 +504,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -566,6 +573,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -633,6 +641,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -700,6 +709,7 @@ fn indexing_wiki(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -766,6 +776,7 @@ fn reindexing_wiki(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -811,6 +822,7 @@ fn reindexing_wiki(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -879,6 +891,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -956,6 +969,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1002,6 +1016,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1044,6 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1111,6 +1127,7 @@ fn indexing_movies_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1177,6 +1194,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1222,6 +1240,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1290,6 +1309,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1404,6 +1424,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1449,6 +1470,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1490,6 +1512,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1580,6 +1603,7 @@ fn indexing_nested_movies_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1671,6 +1695,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1754,6 +1779,7 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1821,6 +1847,7 @@ fn indexing_geo(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1887,6 +1914,7 @@ fn reindexing_geo(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1932,6 +1960,7 @@ fn reindexing_geo(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2000,6 +2029,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -123,6 +123,7 @@ pub fn base_setup(conf: &Conf) -> Index {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -10,7 +10,7 @@ use meilisearch_types::keys::Key;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod;
|
||||
use meilisearch_types::settings::Unchecked;
|
||||
use meilisearch_types::tasks::{
|
||||
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId,
|
||||
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId, TaskNetwork,
|
||||
};
|
||||
use meilisearch_types::InstanceUid;
|
||||
use roaring::RoaringBitmap;
|
||||
@ -94,6 +94,8 @@ pub struct TaskDump {
|
||||
default
|
||||
)]
|
||||
pub finished_at: Option<OffsetDateTime>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
}
|
||||
|
||||
// A `Kind` specific version made for the dump. If modified you may break the dump.
|
||||
@ -172,6 +174,7 @@ impl From<Task> for TaskDump {
|
||||
enqueued_at: task.enqueued_at,
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
network: task.network,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -250,8 +253,9 @@ pub(crate) mod test {
|
||||
use big_s::S;
|
||||
use maplit::{btreemap, btreeset};
|
||||
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
|
||||
use meilisearch_types::enterprise_edition::network::{Network, Remote};
|
||||
use meilisearch_types::facet_values_sort::FacetValuesSort;
|
||||
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::keys::{Action, Key};
|
||||
use meilisearch_types::milli::update::Setting;
|
||||
@ -384,6 +388,7 @@ pub(crate) mod test {
|
||||
enqueued_at: datetime!(2022-11-11 0:00 UTC),
|
||||
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
|
||||
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
|
||||
network: None,
|
||||
},
|
||||
None,
|
||||
),
|
||||
@ -408,6 +413,7 @@ pub(crate) mod test {
|
||||
enqueued_at: datetime!(2022-11-11 0:00 UTC),
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
network: None,
|
||||
},
|
||||
Some(vec![
|
||||
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
|
||||
@ -427,6 +433,7 @@ pub(crate) mod test {
|
||||
enqueued_at: datetime!(2022-11-15 0:00 UTC),
|
||||
started_at: None,
|
||||
finished_at: None,
|
||||
network: None,
|
||||
},
|
||||
None,
|
||||
),
|
||||
@ -539,7 +546,8 @@ pub(crate) mod test {
|
||||
fn create_test_network() -> Network {
|
||||
Network {
|
||||
local: Some("myself".to_string()),
|
||||
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
|
||||
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()), write_api_key: Some("docApiKey".to_string()) }},
|
||||
sharding: false,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -163,6 +163,7 @@ impl CompatV5ToV6 {
|
||||
enqueued_at: task_view.enqueued_at,
|
||||
started_at: task_view.started_at,
|
||||
finished_at: task_view.finished_at,
|
||||
network: None,
|
||||
};
|
||||
|
||||
(task, content_file)
|
||||
|
@ -24,7 +24,7 @@ pub type Batch = meilisearch_types::batches::Batch;
|
||||
pub type Key = meilisearch_types::keys::Key;
|
||||
pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings;
|
||||
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
pub type Network = meilisearch_types::features::Network;
|
||||
pub type Network = meilisearch_types::enterprise_edition::network::Network;
|
||||
pub type Webhooks = meilisearch_types::webhooks::WebhooksDumpView;
|
||||
|
||||
// ===== Other types to clarify the code of the compat module
|
||||
|
@ -5,7 +5,8 @@ use std::path::PathBuf;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use meilisearch_types::batches::Batch;
|
||||
use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::enterprise_edition::network::Network;
|
||||
use meilisearch_types::features::{ChatCompletionSettings, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::keys::Key;
|
||||
use meilisearch_types::settings::{Checked, Settings};
|
||||
use meilisearch_types::webhooks::WebhooksDumpView;
|
||||
|
@ -148,11 +148,10 @@ impl File {
|
||||
Ok(Self { path: PathBuf::new(), file: None })
|
||||
}
|
||||
|
||||
pub fn persist(self) -> Result<()> {
|
||||
if let Some(file) = self.file {
|
||||
file.persist(&self.path)?;
|
||||
}
|
||||
Ok(())
|
||||
pub fn persist(self) -> Result<Option<StdFile>> {
|
||||
let Some(file) = self.file else { return Ok(None) };
|
||||
|
||||
Ok(Some(file.persist(&self.path)?))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,6 +129,7 @@ fn main() {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -147,6 +147,7 @@ impl<'a> Dump<'a> {
|
||||
canceled_by: task.canceled_by,
|
||||
details: task.details,
|
||||
status: task.status,
|
||||
network: task.network,
|
||||
kind: match task.kind {
|
||||
KindDump::DocumentImport {
|
||||
primary_key,
|
||||
|
@ -1,6 +1,7 @@
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::enterprise_edition::network::Network;
|
||||
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
|
||||
|
||||
|
@ -230,6 +230,7 @@ pub fn snapshot_task(task: &Task) -> String {
|
||||
details,
|
||||
status,
|
||||
kind,
|
||||
network,
|
||||
} = task;
|
||||
snap.push('{');
|
||||
snap.push_str(&format!("uid: {uid}, "));
|
||||
@ -247,6 +248,9 @@ pub fn snapshot_task(task: &Task) -> String {
|
||||
snap.push_str(&format!("details: {}, ", &snapshot_details(details)));
|
||||
}
|
||||
snap.push_str(&format!("kind: {kind:?}"));
|
||||
if let Some(network) = network {
|
||||
snap.push_str(&format!("network: {network:?}, "))
|
||||
}
|
||||
|
||||
snap.push('}');
|
||||
snap
|
||||
|
@ -51,8 +51,9 @@ pub use features::RoFeatures;
|
||||
use flate2::bufread::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use meilisearch_types::batches::Batch;
|
||||
use meilisearch_types::enterprise_edition::network::Network;
|
||||
use meilisearch_types::features::{
|
||||
ChatCompletionSettings, InstanceTogglableFeatures, Network, RuntimeTogglableFeatures,
|
||||
ChatCompletionSettings, InstanceTogglableFeatures, RuntimeTogglableFeatures,
|
||||
};
|
||||
use meilisearch_types::heed::byteorder::BE;
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, SerdeJson, Str, I128};
|
||||
@ -64,7 +65,7 @@ use meilisearch_types::milli::vector::{
|
||||
};
|
||||
use meilisearch_types::milli::{self, Index};
|
||||
use meilisearch_types::task_view::TaskView;
|
||||
use meilisearch_types::tasks::{KindWithContent, Task};
|
||||
use meilisearch_types::tasks::{KindWithContent, Task, TaskNetwork};
|
||||
use meilisearch_types::webhooks::{Webhook, WebhooksDumpView, WebhooksView};
|
||||
use milli::vector::db::IndexEmbeddingConfig;
|
||||
use processing::ProcessingTasks;
|
||||
@ -666,6 +667,16 @@ impl IndexScheduler {
|
||||
self.queue.get_task_ids_from_authorized_indexes(&rtxn, query, filters, &processing)
|
||||
}
|
||||
|
||||
pub fn set_task_network(&self, task_id: TaskId, network: TaskNetwork) -> Result<()> {
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut task =
|
||||
self.queue.tasks.get_task(&wtxn, task_id)?.ok_or(Error::TaskNotFound(task_id))?;
|
||||
task.network = Some(network);
|
||||
self.queue.tasks.all_tasks.put(&mut wtxn, &task_id, &task)?;
|
||||
wtxn.commit()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return the batches matching the query from the user's point of view along
|
||||
/// with the total number of batches matching the query, ignoring from and limit.
|
||||
///
|
||||
|
@ -279,6 +279,7 @@ impl Queue {
|
||||
details: kind.default_details(),
|
||||
status: Status::Enqueued,
|
||||
kind: kind.clone(),
|
||||
network: None,
|
||||
};
|
||||
// For deletion and cancelation tasks, we want to make extra sure that they
|
||||
// don't attempt to delete/cancel tasks that are newer than themselves.
|
||||
|
@ -97,7 +97,22 @@ impl TaskQueue {
|
||||
Ok(self.all_tasks.get(rtxn, &task_id)?)
|
||||
}
|
||||
|
||||
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
|
||||
/// Update the inverted task indexes and write the new value of the task.
|
||||
///
|
||||
/// The passed `task` object typically comes from a previous transaction, so two kinds of modification might have occurred:
|
||||
/// 1. Modification to the `task` object after loading it from the DB (the purpose of this method is to persist these changes)
|
||||
/// 2. Modification to the task committed by another transaction in the DB (an annoying consequence of having lost the original
|
||||
/// transaction from which the `task` instance was deserialized)
|
||||
///
|
||||
/// When calling this function, this `task` is modified to take into account any existing `network`
|
||||
/// that can have been added since the task was loaded into memory.
|
||||
///
|
||||
/// Any other modification to the task that was committed from the DB since the parameter was pulled from the DB will be overwritten.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - CorruptedTaskQueue: The task doesn't exist in the database
|
||||
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> {
|
||||
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
let reprocessing = old_task.status != Status::Enqueued;
|
||||
|
||||
@ -157,6 +172,12 @@ impl TaskQueue {
|
||||
}
|
||||
}
|
||||
|
||||
task.network = match (old_task.network, task.network.take()) {
|
||||
(None, None) => None,
|
||||
(None, Some(network)) | (Some(network), None) => Some(network),
|
||||
(Some(_), Some(network)) => Some(network),
|
||||
};
|
||||
|
||||
self.all_tasks.put(wtxn, &task.uid, task)?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -268,7 +268,7 @@ impl IndexScheduler {
|
||||
|
||||
self.queue
|
||||
.tasks
|
||||
.update_task(&mut wtxn, &task)
|
||||
.update_task(&mut wtxn, &mut task)
|
||||
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
|
||||
}
|
||||
if let Some(canceled_by) = canceled_by {
|
||||
@ -349,7 +349,7 @@ impl IndexScheduler {
|
||||
|
||||
self.queue
|
||||
.tasks
|
||||
.update_task(&mut wtxn, &task)
|
||||
.update_task(&mut wtxn, &mut task)
|
||||
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
|
||||
}
|
||||
}
|
||||
|
@ -66,6 +66,11 @@ impl IndexScheduler {
|
||||
}
|
||||
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
|
||||
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
|
||||
|
||||
let network = self.network();
|
||||
|
||||
let shards = network.shards();
|
||||
|
||||
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
|
||||
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
|
||||
// to a fresh thread.
|
||||
@ -130,6 +135,7 @@ impl IndexScheduler {
|
||||
&mut new_fields_ids_map,
|
||||
&|| must_stop_processing.get(),
|
||||
progress.clone(),
|
||||
shards.as_ref(),
|
||||
)
|
||||
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;
|
||||
|
||||
|
@ -91,6 +91,7 @@ pub fn upgrade_index_scheduler(
|
||||
details: Some(Details::UpgradeDatabase { from, to }),
|
||||
status: Status::Enqueued,
|
||||
kind: KindWithContent::UpgradeDatabase { from },
|
||||
network: None,
|
||||
},
|
||||
)?;
|
||||
wtxn.commit()?;
|
||||
|
@ -1,6 +1,5 @@
|
||||
//! Utility functions on the DBs. Mainly getter and setters.
|
||||
|
||||
use crate::milli::progress::EmbedderStats;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
@ -15,6 +14,7 @@ use meilisearch_types::tasks::{
|
||||
use roaring::RoaringBitmap;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::milli::progress::EmbedderStats;
|
||||
use crate::{Error, Result, Task, TaskId, BEI128};
|
||||
|
||||
/// This structure contains all the information required to write a batch in the database without reading the tasks.
|
||||
@ -377,6 +377,7 @@ impl crate::IndexScheduler {
|
||||
details,
|
||||
status,
|
||||
kind,
|
||||
network: _,
|
||||
} = task;
|
||||
assert_eq!(uid, task.uid);
|
||||
if task.status != Status::Enqueued {
|
||||
|
6
crates/meilisearch-types/src/enterprise_edition/mod.rs
Normal file
6
crates/meilisearch-types/src/enterprise_edition/mod.rs
Normal file
@ -0,0 +1,6 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
pub mod network;
|
47
crates/meilisearch-types/src/enterprise_edition/network.rs
Normal file
47
crates/meilisearch-types/src/enterprise_edition/network.rs
Normal file
@ -0,0 +1,47 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use milli::update::new::indexer::enterprise_edition::sharding::Shards;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Network {
|
||||
#[serde(default, rename = "self")]
|
||||
pub local: Option<String>,
|
||||
#[serde(default)]
|
||||
pub remotes: BTreeMap<String, Remote>,
|
||||
#[serde(default)]
|
||||
pub sharding: bool,
|
||||
}
|
||||
|
||||
impl Network {
|
||||
pub fn shards(&self) -> Option<Shards> {
|
||||
if self.sharding {
|
||||
let this = self.local.as_deref().expect("Inconsistent `sharding` and `self`");
|
||||
let others = self
|
||||
.remotes
|
||||
.keys()
|
||||
.filter(|name| name.as_str() != this)
|
||||
.map(|name| name.to_owned())
|
||||
.collect();
|
||||
Some(Shards { own: vec![this.to_owned()], others })
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Remote {
|
||||
pub url: String,
|
||||
#[serde(default)]
|
||||
pub search_api_key: Option<String>,
|
||||
#[serde(default)]
|
||||
pub write_api_key: Option<String>,
|
||||
}
|
@ -235,9 +235,11 @@ InvalidDocumentFields , InvalidRequest , BAD_REQU
|
||||
InvalidDocumentRetrieveVectors , InvalidRequest , BAD_REQUEST ;
|
||||
MissingDocumentFilter , InvalidRequest , BAD_REQUEST ;
|
||||
MissingDocumentEditionFunction , InvalidRequest , BAD_REQUEST ;
|
||||
InconsistentDocumentChangeHeaders , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentFilter , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentSort , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentGeoField , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidHeaderValue , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidVectorDimensions , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidVectorsType , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidDocumentId , InvalidRequest , BAD_REQUEST ;
|
||||
@ -266,7 +268,9 @@ InvalidMultiSearchRemote , InvalidRequest , BAD_REQU
|
||||
InvalidMultiSearchWeight , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkRemotes , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkSelf , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkSharding , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkSearchApiKey , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkWriteApiKey , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidNetworkUrl , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchAttributesToSearchOn , InvalidRequest , BAD_REQUEST ;
|
||||
InvalidSearchAttributesToCrop , InvalidRequest , BAD_REQUEST ;
|
||||
|
@ -1,5 +1,3 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::error::{Code, ResponseError};
|
||||
@ -32,23 +30,6 @@ pub struct InstanceTogglableFeatures {
|
||||
pub contains_filter: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Remote {
|
||||
pub url: String,
|
||||
#[serde(default)]
|
||||
pub search_api_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Network {
|
||||
#[serde(default, rename = "self")]
|
||||
pub local: Option<String>,
|
||||
#[serde(default)]
|
||||
pub remotes: BTreeMap<String, Remote>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChatCompletionSettings {
|
||||
|
@ -3,6 +3,7 @@ pub mod batches;
|
||||
pub mod compression;
|
||||
pub mod deserr;
|
||||
pub mod document_formats;
|
||||
pub mod enterprise_edition;
|
||||
pub mod error;
|
||||
pub mod facet_values_sort;
|
||||
pub mod features;
|
||||
|
@ -11,6 +11,7 @@ use crate::error::ResponseError;
|
||||
use crate::settings::{Settings, Unchecked};
|
||||
use crate::tasks::{
|
||||
serialize_duration, Details, DetailsExportIndexSettings, IndexSwap, Kind, Status, Task, TaskId,
|
||||
TaskNetwork,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, ToSchema)]
|
||||
@ -51,6 +52,9 @@ pub struct TaskView {
|
||||
#[schema(value_type = String, example = json!("2024-08-08_14:12:09.393Z"))]
|
||||
#[serde(with = "time::serde::rfc3339::option", default)]
|
||||
pub finished_at: Option<OffsetDateTime>,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
}
|
||||
|
||||
impl TaskView {
|
||||
@ -68,6 +72,7 @@ impl TaskView {
|
||||
enqueued_at: task.enqueued_at,
|
||||
started_at: task.started_at,
|
||||
finished_at: task.finished_at,
|
||||
network: task.network.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -42,6 +42,9 @@ pub struct Task {
|
||||
|
||||
pub status: Status,
|
||||
pub kind: KindWithContent,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<TaskNetwork>,
|
||||
}
|
||||
|
||||
impl Task {
|
||||
@ -737,6 +740,36 @@ pub enum Details {
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
|
||||
#[serde(untagged, rename_all = "camelCase")]
|
||||
pub enum TaskNetwork {
|
||||
Origin { origin: Origin },
|
||||
Remotes { remote_tasks: BTreeMap<String, RemoteTask> },
|
||||
}
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Origin {
|
||||
pub remote_name: String,
|
||||
pub task_uid: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct RemoteTask {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
task_uid: Option<TaskId>,
|
||||
error: Option<ResponseError>,
|
||||
}
|
||||
|
||||
impl From<Result<TaskId, ResponseError>> for RemoteTask {
|
||||
fn from(res: Result<TaskId, ResponseError>) -> RemoteTask {
|
||||
match res {
|
||||
Ok(task_uid) => RemoteTask { task_uid: Some(task_uid), error: None },
|
||||
Err(err) => RemoteTask { task_uid: None, error: Some(err) },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct DetailsExportIndexSettings {
|
||||
|
@ -115,6 +115,9 @@ utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] }
|
||||
async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "better-error-handling" }
|
||||
secrecy = "0.10.3"
|
||||
actix-web-lab = { version = "0.24.1", default-features = false }
|
||||
urlencoding = "2.1.3"
|
||||
backoff = { version = "0.4.0", features = ["tokio"] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.10.0"
|
||||
@ -125,7 +128,6 @@ manifest-dir-macros = "0.1.18"
|
||||
maplit = "1.0.2"
|
||||
meili-snap = { path = "../meili-snap" }
|
||||
temp-env = "0.3.6"
|
||||
urlencoding = "2.1.3"
|
||||
wiremock = "0.6.3"
|
||||
yaup = "0.3.1"
|
||||
|
||||
|
@ -9,6 +9,8 @@ use meilisearch_types::milli::OrderBy;
|
||||
use serde_json::Value;
|
||||
use tokio::task::JoinError;
|
||||
|
||||
use crate::routes::indexes::{PROXY_ORIGIN_REMOTE_HEADER, PROXY_ORIGIN_TASK_UID_HEADER};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MeilisearchHttpError {
|
||||
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
|
||||
@ -80,6 +82,16 @@ pub enum MeilisearchHttpError {
|
||||
MissingSearchHybrid,
|
||||
#[error("Invalid request: both `media` and `vector` parameters are present.")]
|
||||
MediaAndVector,
|
||||
#[error("Inconsistent `Origin` headers: {} was provided but {} is missing.\n - Hint: Either both headers should be provided, or none of them", if *is_remote_missing {
|
||||
PROXY_ORIGIN_TASK_UID_HEADER
|
||||
} else { PROXY_ORIGIN_REMOTE_HEADER },
|
||||
if *is_remote_missing {
|
||||
PROXY_ORIGIN_REMOTE_HEADER
|
||||
} else { PROXY_ORIGIN_TASK_UID_HEADER }
|
||||
)]
|
||||
InconsistentOriginHeaders { is_remote_missing: bool },
|
||||
#[error("Invalid value for header {header_name}: {msg}")]
|
||||
InvalidHeaderValue { header_name: &'static str, msg: String },
|
||||
}
|
||||
|
||||
impl MeilisearchHttpError {
|
||||
@ -124,6 +136,10 @@ impl ErrorCode for MeilisearchHttpError {
|
||||
MeilisearchHttpError::InconsistentFacetOrder { .. } => {
|
||||
Code::InvalidMultiSearchFacetOrder
|
||||
}
|
||||
MeilisearchHttpError::InconsistentOriginHeaders { .. } => {
|
||||
Code::InconsistentDocumentChangeHeaders
|
||||
}
|
||||
MeilisearchHttpError::InvalidHeaderValue { .. } => Code::InvalidHeaderValue,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -628,6 +628,7 @@ fn import_dump(
|
||||
&mut new_fields_ids_map,
|
||||
&|| false, // never stop processing a dump
|
||||
progress.clone(),
|
||||
None,
|
||||
)?;
|
||||
|
||||
let operation_stats = operation_stats.pop().unwrap();
|
||||
|
@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::extractors::payload::Payload;
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
use crate::routes::indexes::enterprise_edition::proxy::{proxy, Body};
|
||||
use crate::routes::indexes::search::fix_sort_query_parameters;
|
||||
use crate::routes::{
|
||||
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
|
||||
@ -338,6 +339,7 @@ pub async fn delete_document(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let DocumentParam { index_uid, document_id } = path.into_inner();
|
||||
let index_uid = IndexUid::try_from(index_uid)?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -355,10 +357,16 @@ pub async fn delete_document(
|
||||
};
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
@ -804,7 +812,6 @@ pub async fn replace_documents(
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
index_uid,
|
||||
params.primary_key,
|
||||
@ -814,8 +821,10 @@ pub async fn replace_documents(
|
||||
uid,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
)
|
||||
.await?;
|
||||
|
||||
debug!(returns = ?task, "Replace documents");
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -905,7 +914,6 @@ pub async fn update_documents(
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
index_uid,
|
||||
params.primary_key,
|
||||
@ -915,6 +923,7 @@ pub async fn update_documents(
|
||||
uid,
|
||||
dry_run,
|
||||
allow_index_creation,
|
||||
&req,
|
||||
)
|
||||
.await?;
|
||||
debug!(returns = ?task, "Update documents");
|
||||
@ -924,7 +933,6 @@ pub async fn update_documents(
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn document_addition(
|
||||
mime_type: Option<Mime>,
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
|
||||
index_uid: IndexUid,
|
||||
primary_key: Option<String>,
|
||||
@ -934,7 +942,11 @@ async fn document_addition(
|
||||
task_id: Option<TaskId>,
|
||||
dry_run: bool,
|
||||
allow_index_creation: bool,
|
||||
req: &HttpRequest,
|
||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||
let mime_type = extract_mime_type(req)?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
let format = match (
|
||||
mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())),
|
||||
csv_delimiter,
|
||||
@ -966,7 +978,7 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?;
|
||||
let documents_count = match format {
|
||||
let res = match format {
|
||||
PayloadType::Ndjson => {
|
||||
let (path, file) = update_file.into_parts();
|
||||
let file = match file {
|
||||
@ -981,19 +993,19 @@ async fn document_addition(
|
||||
None => None,
|
||||
};
|
||||
|
||||
let documents_count = tokio::task::spawn_blocking(move || {
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
let documents_count = file.as_ref().map_or(Ok(0), |ntf| {
|
||||
read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat)
|
||||
})?;
|
||||
|
||||
let update_file = file_store::File::from_parts(path, file);
|
||||
update_file.persist()?;
|
||||
let update_file = update_file.persist()?;
|
||||
|
||||
Ok(documents_count)
|
||||
Ok((documents_count, update_file))
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(documents_count)
|
||||
Ok(res)
|
||||
}
|
||||
PayloadType::Json | PayloadType::Csv { delimiter: _ } => {
|
||||
let temp_file = match tempfile() {
|
||||
@ -1012,16 +1024,16 @@ async fn document_addition(
|
||||
unreachable!("We already wrote the user content into the update file")
|
||||
}
|
||||
};
|
||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||
update_file.persist()?;
|
||||
Ok(documents_count)
|
||||
// we NEED to persist the file here because we moved the `update_file` in another task.
|
||||
let file = update_file.persist()?;
|
||||
Ok((documents_count, file))
|
||||
})
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
let documents_count = match documents_count {
|
||||
Ok(Ok(documents_count)) => documents_count,
|
||||
let (documents_count, file) = match res {
|
||||
Ok(Ok((documents_count, file))) => (documents_count, file),
|
||||
// in this case the file has not possibly be persisted.
|
||||
Ok(Err(e)) => return Err(e),
|
||||
Err(e) => {
|
||||
@ -1063,6 +1075,20 @@ async fn document_addition(
|
||||
}
|
||||
};
|
||||
|
||||
if network.sharding {
|
||||
if let Some(file) = file {
|
||||
proxy(
|
||||
&index_scheduler,
|
||||
&index_uid,
|
||||
req,
|
||||
network,
|
||||
Body::with_ndjson_payload(file),
|
||||
&task,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(task.into())
|
||||
}
|
||||
|
||||
@ -1141,6 +1167,7 @@ pub async fn delete_documents_batch(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!(parameters = ?body, "Delete documents by batch");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -1161,16 +1188,22 @@ pub async fn delete_documents_batch(
|
||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete documents by batch");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserr, ToSchema)]
|
||||
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct DocumentDeletionByFilter {
|
||||
@ -1219,7 +1252,8 @@ pub async fn delete_documents_by_filter(
|
||||
debug!(parameters = ?body, "Delete documents by filter");
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let filter = body.into_inner().filter;
|
||||
let filter = body.into_inner();
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
@ -1232,23 +1266,36 @@ pub async fn delete_documents_by_filter(
|
||||
);
|
||||
|
||||
// we ensure the filter is well formed before enqueuing it
|
||||
crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())?
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
crate::search::parse_filter(
|
||||
&filter.filter,
|
||||
Code::InvalidDocumentFilter,
|
||||
index_scheduler.features(),
|
||||
)?
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
|
||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||
let task = KindWithContent::DocumentDeletionByFilter {
|
||||
index_uid: index_uid.clone(),
|
||||
filter_expr: filter.filter.clone(),
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete documents by filter");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserr, ToSchema)]
|
||||
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||
pub struct DocumentEditionByFunction {
|
||||
/// A string containing a RHAI function.
|
||||
@ -1336,6 +1383,8 @@ pub async fn edit_documents_by_function(
|
||||
.features()
|
||||
.check_edit_documents_by_function("Using the documents edit route")?;
|
||||
|
||||
let network = index_scheduler.network();
|
||||
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let index_uid = index_uid.into_inner();
|
||||
let params = params.into_inner();
|
||||
@ -1349,13 +1398,12 @@ pub async fn edit_documents_by_function(
|
||||
&req,
|
||||
);
|
||||
|
||||
let DocumentEditionByFunction { filter, context, function } = params;
|
||||
let engine = milli::rhai::Engine::new();
|
||||
if let Err(e) = engine.compile(&function) {
|
||||
if let Err(e) = engine.compile(¶ms.function) {
|
||||
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
|
||||
}
|
||||
|
||||
if let Some(ref filter) = filter {
|
||||
if let Some(ref filter) = params.filter {
|
||||
// we ensure the filter is well formed before enqueuing it
|
||||
crate::search::parse_filter(
|
||||
filter,
|
||||
@ -1365,9 +1413,9 @@ pub async fn edit_documents_by_function(
|
||||
.ok_or(MeilisearchHttpError::EmptyFilter)?;
|
||||
}
|
||||
let task = KindWithContent::DocumentEdition {
|
||||
index_uid,
|
||||
filter_expr: filter,
|
||||
context: match context {
|
||||
index_uid: index_uid.clone(),
|
||||
filter_expr: params.filter.clone(),
|
||||
context: match params.context.clone() {
|
||||
Some(Value::Object(m)) => Some(m),
|
||||
None => None,
|
||||
_ => {
|
||||
@ -1377,15 +1425,21 @@ pub async fn edit_documents_by_function(
|
||||
))
|
||||
}
|
||||
},
|
||||
function,
|
||||
function: params.function.clone(),
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Edit documents by function");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -1428,6 +1482,8 @@ pub async fn clear_all_documents(
|
||||
analytics: web::Data<Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let network = index_scheduler.network();
|
||||
|
||||
analytics.publish(
|
||||
DocumentsDeletionAggregator {
|
||||
clear_all: true,
|
||||
@ -1441,10 +1497,18 @@ pub async fn clear_all_documents(
|
||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||
let uid = get_task_id(&req, &opt)?;
|
||||
let dry_run = is_dry_run(&req, &opt)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||
.await??
|
||||
.into();
|
||||
|
||||
let task = {
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
|
||||
};
|
||||
|
||||
if network.sharding && !dry_run {
|
||||
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
|
||||
}
|
||||
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
debug!(returns = ?task, "Delete all documents");
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -0,0 +1,6 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
pub mod proxy;
|
@ -0,0 +1,426 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::fs::File;
|
||||
|
||||
use actix_web::http::header::CONTENT_TYPE;
|
||||
use actix_web::HttpRequest;
|
||||
use bytes::Bytes;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::tasks::{Origin, RemoteTask, TaskNetwork};
|
||||
use reqwest::StatusCode;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::routes::indexes::enterprise_edition::proxy::error::{
|
||||
ProxyDocumentChangeError, ReqwestErrorWithoutUrl,
|
||||
};
|
||||
use crate::routes::SummarizedTaskView;
|
||||
|
||||
pub enum Body<T: serde::Serialize> {
|
||||
NdJsonPayload(File),
|
||||
Inline(T),
|
||||
None,
|
||||
}
|
||||
|
||||
impl Body<()> {
|
||||
pub fn with_ndjson_payload(file: File) -> Self {
|
||||
Self::NdJsonPayload(file)
|
||||
}
|
||||
|
||||
pub fn none() -> Self {
|
||||
Self::None
|
||||
}
|
||||
}
|
||||
|
||||
/// If necessary, proxies the passed request to the network and update the task description.
|
||||
///
|
||||
/// This function reads the custom headers from the request to determine if must proxy the request or if the request
|
||||
/// has already been proxied.
|
||||
///
|
||||
/// - when it must proxy the request, the endpoint, method and query params are retrieved from the passed `req`, then the `body` is
|
||||
/// sent to all remotes of the `network` (except `self`). The response from the remotes are collected to update the passed `task`
|
||||
/// with the task ids from the task queues of the remotes.
|
||||
/// - when the request has already been proxied, the custom headers contains information about the remote that created the initial task.
|
||||
/// This information is copied to the passed task.
|
||||
pub async fn proxy<T: serde::Serialize>(
|
||||
index_scheduler: &IndexScheduler,
|
||||
index_uid: &str,
|
||||
req: &HttpRequest,
|
||||
network: meilisearch_types::enterprise_edition::network::Network,
|
||||
body: Body<T>,
|
||||
task: &meilisearch_types::tasks::Task,
|
||||
) -> Result<(), MeilisearchHttpError> {
|
||||
match origin_from_req(req)? {
|
||||
Some(origin) => {
|
||||
index_scheduler.set_task_network(task.uid, TaskNetwork::Origin { origin })?
|
||||
}
|
||||
None => {
|
||||
let this = network
|
||||
.local
|
||||
.as_deref()
|
||||
.expect("inconsistent `network.sharding` and `network.self`")
|
||||
.to_owned();
|
||||
|
||||
let content_type = match &body {
|
||||
// for file bodies, force x-ndjson
|
||||
Body::NdJsonPayload(_) => Some(b"application/x-ndjson".as_slice()),
|
||||
// otherwise get content type from request
|
||||
_ => req.headers().get(CONTENT_TYPE).map(|h| h.as_bytes()),
|
||||
};
|
||||
|
||||
let body = match body {
|
||||
Body::NdJsonPayload(file) => Some(Bytes::from_owner(unsafe {
|
||||
memmap2::Mmap::map(&file).map_err(|err| {
|
||||
MeilisearchHttpError::from_milli(err.into(), Some(index_uid.to_owned()))
|
||||
})?
|
||||
})),
|
||||
|
||||
Body::Inline(payload) => {
|
||||
Some(Bytes::copy_from_slice(&serde_json::to_vec(&payload).unwrap()))
|
||||
}
|
||||
|
||||
Body::None => None,
|
||||
};
|
||||
|
||||
let mut in_flight_remote_queries = BTreeMap::new();
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.connect_timeout(std::time::Duration::from_secs(3))
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let method = from_old_http_method(req.method());
|
||||
|
||||
// send payload to all remotes
|
||||
for (node_name, node) in
|
||||
network.remotes.into_iter().filter(|(name, _)| name.as_str() != this)
|
||||
{
|
||||
let body = body.clone();
|
||||
let client = client.clone();
|
||||
let api_key = node.write_api_key;
|
||||
let this = this.clone();
|
||||
let method = method.clone();
|
||||
let path_and_query =
|
||||
req.uri().path_and_query().map(|paq| paq.as_str()).unwrap_or("/");
|
||||
|
||||
in_flight_remote_queries.insert(
|
||||
node_name,
|
||||
tokio::spawn({
|
||||
let url = format!("{}{}", node.url, path_and_query);
|
||||
|
||||
let url_encoded_this = urlencoding::encode(&this).into_owned();
|
||||
let url_encoded_task_uid = task.uid.to_string(); // it's url encoded i promize
|
||||
|
||||
let content_type = content_type.map(|b| b.to_owned());
|
||||
|
||||
let backoff = backoff::ExponentialBackoffBuilder::new()
|
||||
.with_max_elapsed_time(Some(std::time::Duration::from_secs(25)))
|
||||
.build();
|
||||
|
||||
backoff::future::retry(backoff, move || {
|
||||
let url = url.clone();
|
||||
let client = client.clone();
|
||||
let url_encoded_this = url_encoded_this.clone();
|
||||
let url_encoded_task_uid = url_encoded_task_uid.clone();
|
||||
let content_type = content_type.clone();
|
||||
|
||||
let body = body.clone();
|
||||
let api_key = api_key.clone();
|
||||
let method = method.clone();
|
||||
|
||||
async move {
|
||||
try_proxy(
|
||||
method,
|
||||
&url,
|
||||
content_type.as_deref(),
|
||||
api_key.as_deref(),
|
||||
&client,
|
||||
&url_encoded_this,
|
||||
&url_encoded_task_uid,
|
||||
body,
|
||||
)
|
||||
.await
|
||||
}
|
||||
})
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
// wait for all in-flight queries to finish and collect their results
|
||||
let mut remote_tasks: BTreeMap<String, RemoteTask> = BTreeMap::new();
|
||||
for (node_name, handle) in in_flight_remote_queries {
|
||||
match handle.await {
|
||||
Ok(Ok(res)) => {
|
||||
let task_uid = res.task_uid;
|
||||
|
||||
remote_tasks.insert(node_name, Ok(task_uid).into());
|
||||
}
|
||||
Ok(Err(error)) => {
|
||||
remote_tasks.insert(node_name, Err(error.as_response_error()).into());
|
||||
}
|
||||
Err(panic) => match panic.try_into_panic() {
|
||||
Ok(panic) => {
|
||||
let msg = match panic.downcast_ref::<&'static str>() {
|
||||
Some(s) => *s,
|
||||
None => match panic.downcast_ref::<String>() {
|
||||
Some(s) => &s[..],
|
||||
None => "Box<dyn Any>",
|
||||
},
|
||||
};
|
||||
remote_tasks.insert(
|
||||
node_name,
|
||||
Err(ResponseError::from_msg(
|
||||
msg.to_string(),
|
||||
meilisearch_types::error::Code::Internal,
|
||||
))
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!("proxy task was unexpectedly cancelled")
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// edit details to contain the return values from the remotes
|
||||
index_scheduler.set_task_network(task.uid, TaskNetwork::Remotes { remote_tasks })?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn from_old_http_method(method: &actix_http::Method) -> reqwest::Method {
|
||||
match method {
|
||||
&actix_http::Method::CONNECT => reqwest::Method::CONNECT,
|
||||
&actix_http::Method::DELETE => reqwest::Method::DELETE,
|
||||
&actix_http::Method::GET => reqwest::Method::GET,
|
||||
&actix_http::Method::HEAD => reqwest::Method::HEAD,
|
||||
&actix_http::Method::OPTIONS => reqwest::Method::OPTIONS,
|
||||
&actix_http::Method::PATCH => reqwest::Method::PATCH,
|
||||
&actix_http::Method::POST => reqwest::Method::POST,
|
||||
&actix_http::Method::PUT => reqwest::Method::PUT,
|
||||
&actix_http::Method::TRACE => reqwest::Method::TRACE,
|
||||
method => reqwest::Method::from_bytes(method.as_str().as_bytes()).unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn try_proxy(
|
||||
method: reqwest::Method,
|
||||
url: &str,
|
||||
content_type: Option<&[u8]>,
|
||||
api_key: Option<&str>,
|
||||
client: &reqwest::Client,
|
||||
url_encoded_this: &str,
|
||||
url_encoded_task_uid: &str,
|
||||
body: Option<Bytes>,
|
||||
) -> Result<SummarizedTaskView, backoff::Error<ProxyDocumentChangeError>> {
|
||||
let request = client.request(method, url).timeout(std::time::Duration::from_secs(30));
|
||||
let request = if let Some(body) = body { request.body(body) } else { request };
|
||||
let request = if let Some(api_key) = api_key { request.bearer_auth(api_key) } else { request };
|
||||
let request = request.header(PROXY_ORIGIN_TASK_UID_HEADER, url_encoded_task_uid);
|
||||
let request = request.header(PROXY_ORIGIN_REMOTE_HEADER, url_encoded_this);
|
||||
let request = if let Some(content_type) = content_type {
|
||||
request.header(CONTENT_TYPE.as_str(), content_type)
|
||||
} else {
|
||||
request
|
||||
};
|
||||
|
||||
let response = request.send().await;
|
||||
let response = match response {
|
||||
Ok(response) => response,
|
||||
Err(error) if error.is_timeout() => {
|
||||
return Err(backoff::Error::transient(ProxyDocumentChangeError::Timeout))
|
||||
}
|
||||
Err(error) => {
|
||||
return Err(backoff::Error::transient(ProxyDocumentChangeError::CouldNotSendRequest(
|
||||
ReqwestErrorWithoutUrl::new(error),
|
||||
)))
|
||||
}
|
||||
};
|
||||
|
||||
match response.status() {
|
||||
status_code if status_code.is_success() => (),
|
||||
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
|
||||
return Err(backoff::Error::Permanent(ProxyDocumentChangeError::AuthenticationError))
|
||||
}
|
||||
status_code if status_code.is_client_error() => {
|
||||
let response = parse_error(response).await;
|
||||
return Err(backoff::Error::Permanent(ProxyDocumentChangeError::BadRequest {
|
||||
status_code,
|
||||
response,
|
||||
}));
|
||||
}
|
||||
status_code if status_code.is_server_error() => {
|
||||
let response = parse_error(response).await;
|
||||
return Err(backoff::Error::transient(ProxyDocumentChangeError::RemoteError {
|
||||
status_code,
|
||||
response,
|
||||
}));
|
||||
}
|
||||
status_code => {
|
||||
tracing::warn!(
|
||||
status_code = status_code.as_u16(),
|
||||
"remote replied with unexpected status code"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let response = match parse_response(response).await {
|
||||
Ok(response) => response,
|
||||
Err(response) => {
|
||||
return Err(backoff::Error::transient(
|
||||
ProxyDocumentChangeError::CouldNotParseResponse { response },
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn parse_error(response: reqwest::Response) -> Result<String, ReqwestErrorWithoutUrl> {
|
||||
let bytes = match response.bytes().await {
|
||||
Ok(bytes) => bytes,
|
||||
Err(error) => return Err(ReqwestErrorWithoutUrl::new(error)),
|
||||
};
|
||||
|
||||
Ok(parse_bytes_as_error(&bytes))
|
||||
}
|
||||
|
||||
fn parse_bytes_as_error(bytes: &[u8]) -> String {
|
||||
match serde_json::from_slice::<Value>(bytes) {
|
||||
Ok(value) => value.to_string(),
|
||||
Err(_) => String::from_utf8_lossy(bytes).into_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn parse_response<T: DeserializeOwned>(
|
||||
response: reqwest::Response,
|
||||
) -> Result<T, Result<String, ReqwestErrorWithoutUrl>> {
|
||||
let bytes = match response.bytes().await {
|
||||
Ok(bytes) => bytes,
|
||||
Err(error) => return Err(Err(ReqwestErrorWithoutUrl::new(error))),
|
||||
};
|
||||
|
||||
match serde_json::from_slice::<T>(&bytes) {
|
||||
Ok(value) => Ok(value),
|
||||
Err(_) => Err(Ok(parse_bytes_as_error(&bytes))),
|
||||
}
|
||||
}
|
||||
|
||||
mod error {
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use reqwest::StatusCode;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum ProxyDocumentChangeError {
|
||||
#[error("{0}")]
|
||||
CouldNotSendRequest(ReqwestErrorWithoutUrl),
|
||||
#[error("could not authenticate against the remote host\n - hint: check that the remote instance was registered with a valid API key having the `documents.add` action")]
|
||||
AuthenticationError,
|
||||
#[error(
|
||||
"could not parse response from the remote host as a document addition response{}\n - hint: check that the remote instance is a Meilisearch instance running the same version",
|
||||
response_from_remote(response)
|
||||
)]
|
||||
CouldNotParseResponse { response: Result<String, ReqwestErrorWithoutUrl> },
|
||||
#[error("remote host responded with code {}{}\n - hint: check that the remote instance has the correct index configuration for that request\n - hint: check that the `network` experimental feature is enabled on the remote instance", status_code.as_u16(), response_from_remote(response))]
|
||||
BadRequest { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
|
||||
#[error("remote host did not answer before the deadline")]
|
||||
Timeout,
|
||||
#[error("remote host responded with code {}{}", status_code.as_u16(), response_from_remote(response))]
|
||||
RemoteError { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
|
||||
}
|
||||
|
||||
impl ProxyDocumentChangeError {
|
||||
pub fn as_response_error(&self) -> ResponseError {
|
||||
use meilisearch_types::error::Code;
|
||||
let message = self.to_string();
|
||||
let code = match self {
|
||||
ProxyDocumentChangeError::CouldNotSendRequest(_) => Code::RemoteCouldNotSendRequest,
|
||||
ProxyDocumentChangeError::AuthenticationError => Code::RemoteInvalidApiKey,
|
||||
ProxyDocumentChangeError::BadRequest { .. } => Code::RemoteBadRequest,
|
||||
ProxyDocumentChangeError::Timeout => Code::RemoteTimeout,
|
||||
ProxyDocumentChangeError::RemoteError { .. } => Code::RemoteRemoteError,
|
||||
ProxyDocumentChangeError::CouldNotParseResponse { .. } => Code::RemoteBadResponse,
|
||||
};
|
||||
ResponseError::from_msg(message, code)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error(transparent)]
|
||||
pub struct ReqwestErrorWithoutUrl(reqwest::Error);
|
||||
impl ReqwestErrorWithoutUrl {
|
||||
pub fn new(inner: reqwest::Error) -> Self {
|
||||
Self(inner.without_url())
|
||||
}
|
||||
}
|
||||
|
||||
fn response_from_remote(response: &Result<String, ReqwestErrorWithoutUrl>) -> String {
|
||||
match response {
|
||||
Ok(response) => {
|
||||
format!(":\n - response from remote: {}", response)
|
||||
}
|
||||
Err(error) => {
|
||||
format!(":\n - additionally, could not retrieve response from remote: {error}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub const PROXY_ORIGIN_REMOTE_HEADER: &str = "Meili-Proxy-Origin-Remote";
|
||||
pub const PROXY_ORIGIN_TASK_UID_HEADER: &str = "Meili-Proxy-Origin-TaskUid";
|
||||
|
||||
pub fn origin_from_req(req: &HttpRequest) -> Result<Option<Origin>, MeilisearchHttpError> {
|
||||
let (remote_name, task_uid) = match (
|
||||
req.headers().get(PROXY_ORIGIN_REMOTE_HEADER),
|
||||
req.headers().get(PROXY_ORIGIN_TASK_UID_HEADER),
|
||||
) {
|
||||
(None, None) => return Ok(None),
|
||||
(None, Some(_)) => {
|
||||
return Err(MeilisearchHttpError::InconsistentOriginHeaders { is_remote_missing: true })
|
||||
}
|
||||
(Some(_), None) => {
|
||||
return Err(MeilisearchHttpError::InconsistentOriginHeaders {
|
||||
is_remote_missing: false,
|
||||
})
|
||||
}
|
||||
(Some(remote_name), Some(task_uid)) => (
|
||||
urlencoding::decode(remote_name.to_str().map_err(|err| {
|
||||
MeilisearchHttpError::InvalidHeaderValue {
|
||||
header_name: PROXY_ORIGIN_REMOTE_HEADER,
|
||||
msg: format!("while parsing remote name as UTF-8: {err}"),
|
||||
}
|
||||
})?)
|
||||
.map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
|
||||
header_name: PROXY_ORIGIN_REMOTE_HEADER,
|
||||
msg: format!("while URL-decoding remote name: {err}"),
|
||||
})?,
|
||||
urlencoding::decode(task_uid.to_str().map_err(|err| {
|
||||
MeilisearchHttpError::InvalidHeaderValue {
|
||||
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
|
||||
msg: format!("while parsing task UID as UTF-8: {err}"),
|
||||
}
|
||||
})?)
|
||||
.map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
|
||||
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
|
||||
msg: format!("while URL-decoding task UID: {err}"),
|
||||
})?,
|
||||
),
|
||||
};
|
||||
|
||||
let task_uid: usize =
|
||||
task_uid.parse().map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
|
||||
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
|
||||
msg: format!("while parsing the task UID as an integer: {err}"),
|
||||
})?;
|
||||
|
||||
Ok(Some(Origin { remote_name: remote_name.into_owned(), task_uid }))
|
||||
}
|
@ -29,6 +29,7 @@ use crate::routes::is_dry_run;
|
||||
use crate::Opt;
|
||||
|
||||
pub mod documents;
|
||||
mod enterprise_edition;
|
||||
pub mod facet_search;
|
||||
pub mod search;
|
||||
mod search_analytics;
|
||||
@ -39,6 +40,8 @@ mod settings_analytics;
|
||||
pub mod similar;
|
||||
mod similar_analytics;
|
||||
|
||||
pub use enterprise_edition::proxy::{PROXY_ORIGIN_REMOTE_HEADER, PROXY_ORIGIN_TASK_UID_HEADER};
|
||||
|
||||
#[derive(OpenApi)]
|
||||
#[openapi(
|
||||
nest(
|
||||
|
@ -184,7 +184,7 @@ pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
|
||||
.is_some_and(|s| s.to_lowercase() == "true"))
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, ToSchema)]
|
||||
#[derive(Debug, Serialize, Deserialize, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SummarizedTaskView {
|
||||
/// The task unique identifier.
|
||||
@ -198,7 +198,10 @@ pub struct SummarizedTaskView {
|
||||
#[serde(rename = "type")]
|
||||
kind: Kind,
|
||||
/// The date on which the task was enqueued.
|
||||
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
|
||||
#[serde(
|
||||
serialize_with = "time::serde::rfc3339::serialize",
|
||||
deserialize_with = "time::serde::rfc3339::deserialize"
|
||||
)]
|
||||
enqueued_at: OffsetDateTime,
|
||||
}
|
||||
|
||||
|
@ -7,11 +7,12 @@ use deserr::Deserr;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use itertools::{EitherOrBoth, Itertools};
|
||||
use meilisearch_types::deserr::DeserrJsonError;
|
||||
use meilisearch_types::enterprise_edition::network::{Network as DbNetwork, Remote as DbRemote};
|
||||
use meilisearch_types::error::deserr_codes::{
|
||||
InvalidNetworkRemotes, InvalidNetworkSearchApiKey, InvalidNetworkSelf, InvalidNetworkUrl,
|
||||
InvalidNetworkRemotes, InvalidNetworkSearchApiKey, InvalidNetworkSelf, InvalidNetworkSharding,
|
||||
InvalidNetworkUrl, InvalidNetworkWriteApiKey,
|
||||
};
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::features::{Network as DbNetwork, Remote as DbRemote};
|
||||
use meilisearch_types::keys::actions;
|
||||
use meilisearch_types::milli::update::Setting;
|
||||
use serde::Serialize;
|
||||
@ -57,9 +58,9 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
{
|
||||
"self": "ms-0",
|
||||
"remotes": {
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
|
||||
}
|
||||
})),
|
||||
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
|
||||
@ -88,9 +89,9 @@ async fn get_network(
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct Remote {
|
||||
#[schema(value_type = Option<String>, example = json!({
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
|
||||
}))]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidNetworkUrl>)]
|
||||
#[serde(default)]
|
||||
@ -99,6 +100,10 @@ pub struct Remote {
|
||||
#[deserr(default, error = DeserrJsonError<InvalidNetworkSearchApiKey>)]
|
||||
#[serde(default)]
|
||||
pub search_api_key: Setting<String>,
|
||||
#[schema(value_type = Option<String>, example = json!("XWnBI8QHUc-4IlqbKPLUDuhftNq19mQtjc6JvmivzJU"))]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidNetworkWriteApiKey>)]
|
||||
#[serde(default)]
|
||||
pub write_api_key: Setting<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserr, ToSchema, Serialize)]
|
||||
@ -114,6 +119,10 @@ pub struct Network {
|
||||
#[serde(default, rename = "self")]
|
||||
#[deserr(default, rename = "self", error = DeserrJsonError<InvalidNetworkSelf>)]
|
||||
pub local: Setting<String>,
|
||||
#[schema(value_type = Option<bool>, example = json!(true))]
|
||||
#[serde(default)]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidNetworkSharding>)]
|
||||
pub sharding: Setting<bool>,
|
||||
}
|
||||
|
||||
impl Remote {
|
||||
@ -136,6 +145,7 @@ impl Remote {
|
||||
Ok(url)
|
||||
})?,
|
||||
search_api_key: self.search_api_key.set(),
|
||||
write_api_key: self.write_api_key.set(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -174,9 +184,9 @@ impl Aggregate for PatchNetworkAnalytics {
|
||||
{
|
||||
"self": "ms-0",
|
||||
"remotes": {
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
|
||||
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
|
||||
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
|
||||
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
|
||||
}
|
||||
})),
|
||||
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
|
||||
@ -207,6 +217,19 @@ async fn patch_network(
|
||||
Setting::NotSet => old_network.local,
|
||||
};
|
||||
|
||||
let merged_sharding = match new_network.sharding {
|
||||
Setting::Set(new_sharding) => new_sharding,
|
||||
Setting::Reset => false,
|
||||
Setting::NotSet => old_network.sharding,
|
||||
};
|
||||
|
||||
if merged_sharding && merged_self.is_none() {
|
||||
return Err(ResponseError::from_msg(
|
||||
"`.sharding`: enabling the sharding requires `.self` to be set\n - Hint: Disable `sharding` or set `self` to a value.".into(),
|
||||
meilisearch_types::error::Code::InvalidNetworkSharding,
|
||||
));
|
||||
}
|
||||
|
||||
let merged_remotes = match new_network.remotes {
|
||||
Setting::Set(new_remotes) => {
|
||||
let mut merged_remotes = BTreeMap::new();
|
||||
@ -217,9 +240,17 @@ async fn patch_network(
|
||||
{
|
||||
match either_or_both {
|
||||
EitherOrBoth::Both((key, old), (_, Some(new))) => {
|
||||
let DbRemote { url: old_url, search_api_key: old_search_api_key } = old;
|
||||
let DbRemote {
|
||||
url: old_url,
|
||||
search_api_key: old_search_api_key,
|
||||
write_api_key: old_write_api_key,
|
||||
} = old;
|
||||
|
||||
let Remote { url: new_url, search_api_key: new_search_api_key } = new;
|
||||
let Remote {
|
||||
url: new_url,
|
||||
search_api_key: new_search_api_key,
|
||||
write_api_key: new_write_api_key,
|
||||
} = new;
|
||||
|
||||
let merged = DbRemote {
|
||||
url: match new_url {
|
||||
@ -247,6 +278,11 @@ async fn patch_network(
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => old_search_api_key,
|
||||
},
|
||||
write_api_key: match new_write_api_key {
|
||||
Setting::Set(new_write_api_key) => Some(new_write_api_key),
|
||||
Setting::Reset => None,
|
||||
Setting::NotSet => old_write_api_key,
|
||||
},
|
||||
};
|
||||
merged_remotes.insert(key, merged);
|
||||
}
|
||||
@ -274,7 +310,8 @@ async fn patch_network(
|
||||
&req,
|
||||
);
|
||||
|
||||
let merged_network = DbNetwork { local: merged_self, remotes: merged_remotes };
|
||||
let merged_network =
|
||||
DbNetwork { local: merged_self, remotes: merged_remotes, sharding: merged_sharding };
|
||||
index_scheduler.put_network(merged_network.clone())?;
|
||||
debug!(returns = ?merged_network, "Patch network");
|
||||
Ok(HttpResponse::Ok().json(merged_network))
|
||||
|
@ -9,8 +9,8 @@ use std::vec::{IntoIter, Vec};
|
||||
use actix_http::StatusCode;
|
||||
use index_scheduler::{IndexScheduler, RoFeatures};
|
||||
use itertools::Itertools;
|
||||
use meilisearch_types::enterprise_edition::network::{Network, Remote};
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::features::{Network, Remote};
|
||||
use meilisearch_types::milli::order_by_map::OrderByMap;
|
||||
use meilisearch_types::milli::score_details::{ScoreDetails, WeightedScoreValue};
|
||||
use meilisearch_types::milli::vector::Embedding;
|
||||
|
@ -1,6 +1,6 @@
|
||||
pub use error::ProxySearchError;
|
||||
use error::ReqwestErrorWithoutUrl;
|
||||
use meilisearch_types::features::Remote;
|
||||
use meilisearch_types::enterprise_edition::network::Remote;
|
||||
use rand::Rng as _;
|
||||
use reqwest::{Client, Response, StatusCode};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
@ -46,7 +46,7 @@ async fn errors_on_param() {
|
||||
meili_snap::snapshot!(code, @"400 Bad Request");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"message": "Unknown field `selfie`: expected one of `remotes`, `self`",
|
||||
"message": "Unknown field `selfie`: expected one of `remotes`, `self`, `sharding`",
|
||||
"code": "bad_request",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||
@ -149,7 +149,7 @@ async fn errors_on_param() {
|
||||
meili_snap::snapshot!(code, @"400 Bad Request");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"message": "Unknown field `doggo` inside `.remotes.new`: expected one of `url`, `searchApiKey`",
|
||||
"message": "Unknown field `doggo` inside `.remotes.new`: expected one of `url`, `searchApiKey`, `writeApiKey`",
|
||||
"code": "invalid_network_remotes",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#invalid_network_remotes"
|
||||
@ -192,9 +192,11 @@ async fn errors_on_param() {
|
||||
"remotes": {
|
||||
"kefir": {
|
||||
"url": "http://localhost:7700",
|
||||
"searchApiKey": null
|
||||
"searchApiKey": null,
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = server
|
||||
@ -266,7 +268,8 @@ async fn auth() {
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "master",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -274,11 +277,12 @@ async fn auth() {
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "master",
|
||||
"remotes": {}
|
||||
}
|
||||
"###);
|
||||
{
|
||||
"self": "master",
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// try get with get permission
|
||||
server.use_api_key(get_network_key.as_str().unwrap());
|
||||
@ -286,11 +290,12 @@ async fn auth() {
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "master",
|
||||
"remotes": {}
|
||||
}
|
||||
"###);
|
||||
{
|
||||
"self": "master",
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// try update with update permission
|
||||
server.use_api_key(update_network_key.as_str().unwrap());
|
||||
@ -303,11 +308,12 @@ async fn auth() {
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "api_key",
|
||||
"remotes": {}
|
||||
}
|
||||
"###);
|
||||
{
|
||||
"self": "api_key",
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// try with the other's permission
|
||||
let (response, code) = server.get_network().await;
|
||||
@ -383,7 +389,8 @@ async fn get_and_set_network() {
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": null,
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -393,7 +400,8 @@ async fn get_and_set_network() {
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "myself",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -417,13 +425,16 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"myself": {
|
||||
"url": "http://localhost:7700",
|
||||
"searchApiKey": null
|
||||
"searchApiKey": null,
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "foo"
|
||||
"searchApiKey": "foo",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -443,13 +454,16 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"myself": {
|
||||
"url": "http://localhost:7700",
|
||||
"searchApiKey": null
|
||||
"searchApiKey": null,
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -470,17 +484,21 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"myself": {
|
||||
"url": "http://localhost:7700",
|
||||
"searchApiKey": null
|
||||
"searchApiKey": null,
|
||||
"writeApiKey": null
|
||||
},
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -498,13 +516,16 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -518,13 +539,16 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -538,13 +562,16 @@ async fn get_and_set_network() {
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
}
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -553,60 +580,69 @@ async fn get_and_set_network() {
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
}
|
||||
}
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
"###);
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// still doing nothing
|
||||
let (response, code) = server.set_network(json!({"remotes": {}})).await;
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
}
|
||||
}
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
"###);
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// good time to check GET
|
||||
let (response, code) = server.get_network().await;
|
||||
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz"
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar"
|
||||
}
|
||||
}
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {
|
||||
"them": {
|
||||
"url": "http://localhost:7702",
|
||||
"searchApiKey": "baz",
|
||||
"writeApiKey": null
|
||||
},
|
||||
"thy": {
|
||||
"url": "http://localhost:7701",
|
||||
"searchApiKey": "bar",
|
||||
"writeApiKey": null
|
||||
}
|
||||
"###);
|
||||
},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
// deleting everything
|
||||
let (response, code) = server
|
||||
@ -619,7 +655,8 @@ async fn get_and_set_network() {
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
|
||||
{
|
||||
"self": "thy",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
@ -132,7 +132,8 @@ async fn remote_sharding() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -140,7 +141,8 @@ async fn remote_sharding() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms2.set_network(json!({"self": "ms2"})).await;
|
||||
@ -148,7 +150,8 @@ async fn remote_sharding() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms2",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -444,7 +447,8 @@ async fn remote_sharding_retrieve_vectors() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -452,7 +456,8 @@ async fn remote_sharding_retrieve_vectors() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms2.set_network(json!({"self": "ms2"})).await;
|
||||
@ -460,7 +465,8 @@ async fn remote_sharding_retrieve_vectors() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms2",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -934,7 +940,8 @@ async fn error_unregistered_remote() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -942,7 +949,8 @@ async fn error_unregistered_remote() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1052,7 +1060,8 @@ async fn error_no_weighted_score() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1060,7 +1069,8 @@ async fn error_no_weighted_score() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1185,7 +1195,8 @@ async fn error_bad_response() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1193,7 +1204,8 @@ async fn error_bad_response() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1322,7 +1334,8 @@ async fn error_bad_request() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1330,7 +1343,8 @@ async fn error_bad_request() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1452,7 +1466,8 @@ async fn error_bad_request_facets_by_index() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1460,7 +1475,8 @@ async fn error_bad_request_facets_by_index() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1593,7 +1609,8 @@ async fn error_bad_request_facets_by_index_facet() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1601,7 +1618,8 @@ async fn error_bad_request_facets_by_index_facet() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1743,7 +1761,8 @@ async fn error_remote_does_not_answer() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1751,7 +1770,8 @@ async fn error_remote_does_not_answer() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -1944,7 +1964,8 @@ async fn error_remote_404() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -1952,7 +1973,8 @@ async fn error_remote_404() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -2139,7 +2161,8 @@ async fn error_remote_sharding_auth() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -2147,7 +2170,8 @@ async fn error_remote_sharding_auth() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -2299,7 +2323,8 @@ async fn remote_sharding_auth() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -2307,7 +2332,8 @@ async fn remote_sharding_auth() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -2454,7 +2480,8 @@ async fn error_remote_500() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -2462,7 +2489,8 @@ async fn error_remote_500() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
@ -2633,7 +2661,8 @@ async fn error_remote_500_once() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms0",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
|
||||
@ -2641,7 +2670,8 @@ async fn error_remote_500_once() {
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"self": "ms1",
|
||||
"remotes": {}
|
||||
"remotes": {},
|
||||
"sharding": false
|
||||
}
|
||||
"###);
|
||||
|
||||
|
@ -109,6 +109,7 @@ utoipa = { version = "5.4.0", features = [
|
||||
"openapi_extensions",
|
||||
] }
|
||||
lru = "0.14.0"
|
||||
twox-hash = { version = "2.1.1", default-features = false, features = ["std", "xxhash3_64", "xxhash64"] }
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.47", default-features = false }
|
||||
|
@ -76,6 +76,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -84,6 +84,7 @@ impl TempIndex {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)?;
|
||||
|
||||
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
|
||||
@ -167,6 +168,7 @@ impl TempIndex {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)?;
|
||||
|
||||
if let Some(error) = operation_stats.into_iter().find_map(|stat| stat.error) {
|
||||
@ -242,6 +244,7 @@ fn aborting_indexation() {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -1977,6 +1977,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2029,6 +2030,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2117,6 +2119,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2306,6 +2309,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2369,6 +2373,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2423,6 +2428,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2476,6 +2482,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2531,6 +2538,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2591,6 +2599,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2644,6 +2653,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2697,6 +2707,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2908,6 +2919,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -2968,6 +2980,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -3025,6 +3038,7 @@ mod tests {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -17,6 +17,7 @@ use super::guess_primary_key::retrieve_or_guess_primary_key;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::progress::{AtomicPayloadStep, Progress};
|
||||
use crate::update::new::document::{DocumentContext, Versions};
|
||||
use crate::update::new::indexer::enterprise_edition::sharding::Shards;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DocumentIdentifiers, Insertion, Update};
|
||||
@ -71,6 +72,7 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
new_fields_ids_map: &mut FieldsIdsMap,
|
||||
must_stop_processing: &MSP,
|
||||
progress: Progress,
|
||||
shards: Option<&Shards>,
|
||||
) -> Result<(DocumentOperationChanges<'pl>, Vec<PayloadStats>, Option<PrimaryKey<'pl>>)>
|
||||
where
|
||||
MSP: Fn() -> bool,
|
||||
@ -107,6 +109,7 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
&mut bytes,
|
||||
&docids_version_offsets,
|
||||
IndexDocumentsMethod::ReplaceDocuments,
|
||||
shards,
|
||||
payload,
|
||||
),
|
||||
Payload::Update(payload) => extract_addition_payload_changes(
|
||||
@ -120,6 +123,7 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
&mut bytes,
|
||||
&docids_version_offsets,
|
||||
IndexDocumentsMethod::UpdateDocuments,
|
||||
shards,
|
||||
payload,
|
||||
),
|
||||
Payload::Deletion(to_delete) => extract_deletion_payload_changes(
|
||||
@ -127,6 +131,7 @@ impl<'pl> DocumentOperation<'pl> {
|
||||
rtxn,
|
||||
&mut available_docids,
|
||||
&docids_version_offsets,
|
||||
shards,
|
||||
to_delete,
|
||||
),
|
||||
};
|
||||
@ -173,6 +178,7 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
||||
bytes: &mut u64,
|
||||
main_docids_version_offsets: &hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>,
|
||||
method: IndexDocumentsMethod,
|
||||
shards: Option<&Shards>,
|
||||
payload: &'pl [u8],
|
||||
) -> Result<hashbrown::HashMap<&'pl str, PayloadOperations<'pl>>> {
|
||||
use IndexDocumentsMethod::{ReplaceDocuments, UpdateDocuments};
|
||||
@ -210,12 +216,20 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
||||
primary_key.as_ref().unwrap()
|
||||
};
|
||||
|
||||
let current_offset = iter.byte_offset();
|
||||
let content = &payload[previous_offset..current_offset];
|
||||
previous_offset = current_offset;
|
||||
|
||||
let external_id =
|
||||
retrieved_primary_key.extract_fields_and_docid(doc, new_fields_ids_map, indexer)?;
|
||||
|
||||
let external_id = external_id.to_de();
|
||||
let current_offset = iter.byte_offset();
|
||||
let document_offset = DocumentOffset { content: &payload[previous_offset..current_offset] };
|
||||
|
||||
if shards.is_some_and(|shards| !shards.must_process(external_id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let document_offset = DocumentOffset { content };
|
||||
|
||||
match main_docids_version_offsets.get(external_id) {
|
||||
None => {
|
||||
@ -299,8 +313,6 @@ fn extract_addition_payload_changes<'r, 'pl: 'r>(
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
previous_offset = iter.byte_offset();
|
||||
}
|
||||
|
||||
if payload.is_empty() {
|
||||
@ -329,11 +341,16 @@ fn extract_deletion_payload_changes<'s, 'pl: 's>(
|
||||
rtxn: &RoTxn,
|
||||
available_docids: &mut AvailableIds,
|
||||
main_docids_version_offsets: &hashbrown::HashMap<&'s str, PayloadOperations<'pl>>,
|
||||
shards: Option<&Shards>,
|
||||
to_delete: &'pl [&'pl str],
|
||||
) -> Result<hashbrown::HashMap<&'s str, PayloadOperations<'pl>>> {
|
||||
let mut new_docids_version_offsets = hashbrown::HashMap::<&str, PayloadOperations<'pl>>::new();
|
||||
|
||||
for external_id in to_delete {
|
||||
if shards.is_some_and(|shards| !shards.must_process(external_id)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
match main_docids_version_offsets.get(external_id) {
|
||||
None => {
|
||||
match index.external_documents_ids().get(rtxn, external_id) {
|
||||
|
@ -0,0 +1,6 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
pub mod sharding;
|
@ -0,0 +1,22 @@
|
||||
// Copyright © 2025 Meilisearch Some Rights Reserved
|
||||
// This file is part of Meilisearch Enterprise Edition (EE).
|
||||
// Use of this source code is governed by the Business Source License 1.1,
|
||||
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
|
||||
|
||||
use std::hash::{BuildHasher as _, BuildHasherDefault};
|
||||
|
||||
pub struct Shards {
|
||||
pub own: Vec<String>,
|
||||
pub others: Vec<String>,
|
||||
}
|
||||
|
||||
impl Shards {
|
||||
pub fn must_process(&self, docid: &str) -> bool {
|
||||
let hasher = BuildHasherDefault::<twox_hash::XxHash3_64>::new();
|
||||
let to_hash = |shard: &String| hasher.hash_one((shard, docid));
|
||||
|
||||
let max_hash = self.others.iter().map(to_hash).max().unwrap_or_default();
|
||||
|
||||
self.own.iter().map(to_hash).any(|hash| hash > max_hash)
|
||||
}
|
||||
}
|
@ -31,6 +31,7 @@ pub(crate) mod de;
|
||||
pub mod document_changes;
|
||||
mod document_deletion;
|
||||
mod document_operation;
|
||||
pub mod enterprise_edition;
|
||||
mod extract;
|
||||
mod guess_primary_key;
|
||||
mod partial_dump;
|
||||
|
@ -59,6 +59,7 @@ fn test_facet_distribution_with_no_facet_values() {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -97,6 +97,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -329,6 +329,7 @@ fn criteria_ascdesc() {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
@ -138,6 +138,7 @@ fn test_typo_disabled_on_word() {
|
||||
&mut new_fields_ids_map,
|
||||
&|| false,
|
||||
Progress::default(),
|
||||
None,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
|
Reference in New Issue
Block a user