From d67db6e3c2a97acb956c55ccab4f38b1dd988212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 17 Jul 2025 17:12:42 +0200 Subject: [PATCH] Use the edition 2024 documents indexer in the dumps --- Cargo.lock | 5 ++-- crates/meilisearch/Cargo.toml | 1 + crates/meilisearch/src/lib.rs | 54 +++++++++++++++++++++++++++++++++-- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ceec0a05e..8413b3d14 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3775,6 +3775,7 @@ dependencies = [ "meili-snap", "meilisearch-auth", "meilisearch-types", + "memmap2", "mimalloc", "mime", "mopa-maintained", @@ -3908,9 +3909,9 @@ checksum = "32a282da65faaf38286cf3be983213fcf1d2e2a58700e808f83f4ea9a4804bc0" [[package]] name = "memmap2" -version = "0.9.5" +version = "0.9.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd3f7eed9d3848f8b98834af67102b720745c4ec028fcd0aa0239277e7de374f" +checksum = "483758ad303d734cec05e5c12b41d7e93e6a6390c5e9dae6bdeb7c1259012d28" dependencies = [ "libc", "stable_deref_trait", diff --git a/crates/meilisearch/Cargo.toml b/crates/meilisearch/Cargo.toml index 83eb439d9..21f6b58e5 100644 --- a/crates/meilisearch/Cargo.toml +++ b/crates/meilisearch/Cargo.toml @@ -50,6 +50,7 @@ jsonwebtoken = "9.3.1" lazy_static = "1.5.0" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } +memmap2 = "0.9.7" mimalloc = { version = "0.1.47", default-features = false } mime = "0.3.17" num_cpus = "1.17.0" diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 8907a5632..57a20a633 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -30,6 +30,7 @@ use actix_web::web::Data; use actix_web::{web, HttpRequest}; use analytics::Analytics; use anyhow::bail; +use bumpalo::Bump; use error::PayloadError; use extractors::payload::PayloadConfig; use index_scheduler::versioning::Versioning; @@ -38,6 +39,7 @@ use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::progress::{EmbedderStats, Progress}; +use meilisearch_types::milli::update::new::indexer; use meilisearch_types::milli::update::{ default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, }; @@ -534,7 +536,7 @@ fn import_dump( let mut index_reader = index_reader?; let metadata = index_reader.metadata(); let uid = metadata.uid.clone(); - tracing::info!("Importing index `{}`.", metadata.uid); + tracing::info!("Importing index `{uid}`."); let date = Some((metadata.created_at, metadata.updated_at)); let index = index_scheduler.create_raw_index(&metadata.uid, date)?; @@ -553,6 +555,10 @@ fn import_dump( apply_settings_to_builder(&settings, &mut builder); let embedder_stats: Arc = Default::default(); builder.execute(&|| false, &progress, embedder_stats.clone())?; + wtxn.commit()?; + + let mut wtxn = index.write_txn()?; + let rtxn = index.read_txn()?; if index_scheduler.no_edition_2024_for_dumps() { // 5.3 Import the documents. @@ -594,7 +600,51 @@ fn import_dump( tracing::info!(documents_found = user_result, "{} documents found.", user_result); builder.execute()?; } else { - unimplemented!("new document indexer when importing dumps"); + let db_fields_ids_map = index.fields_ids_map(&rtxn)?; + let primary_key = index.primary_key(&rtxn)?; + let mut new_fields_ids_map = db_fields_ids_map.clone(); + + let mut indexer = indexer::DocumentOperation::new(); + let embedders = index.embedding_configs().embedding_configs(&mut wtxn)?; + let embedders = index_scheduler.embedders(uid.clone(), embedders)?; + + let mmap = unsafe { memmap2::Mmap::map(index_reader.documents_file())? }; + + indexer.replace_documents(&mmap)?; + + let indexer_config = index_scheduler.indexer_config(); + let pool = &indexer_config.thread_pool; + + let indexer_alloc = Bump::new(); + let (document_changes, mut operation_stats, primary_key) = indexer.into_changes( + &indexer_alloc, + &index, + &rtxn, + primary_key, + &mut new_fields_ids_map, + &|| false, // never stop processing a dump + progress.clone(), + )?; + + let operation_stats = operation_stats.pop().unwrap(); + if let Some(error) = operation_stats.error { + return Err(error.into()); + } + + let _congestion = indexer::index( + &mut wtxn, + &index, + pool, + indexer_config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + primary_key, + &document_changes, + embedders, + &|| false, // never stop processing a dump + &progress, + &embedder_stats, + )?; } wtxn.commit()?;