Use the edition 2024 documents indexer in the dumps

This commit is contained in:
Clément Renault
2025-07-17 17:12:42 +02:00
parent 760ccffdbd
commit d67db6e3c2
3 changed files with 56 additions and 4 deletions

View File

@ -50,6 +50,7 @@ jsonwebtoken = "9.3.1"
lazy_static = "1.5.0"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.7"
mimalloc = { version = "0.1.47", default-features = false }
mime = "0.3.17"
num_cpus = "1.17.0"

View File

@ -30,6 +30,7 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest};
use analytics::Analytics;
use anyhow::bail;
use bumpalo::Bump;
use error::PayloadError;
use extractors::payload::PayloadConfig;
use index_scheduler::versioning::Versioning;
@ -38,6 +39,7 @@ use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::new::indexer;
use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
};
@ -534,7 +536,7 @@ fn import_dump(
let mut index_reader = index_reader?;
let metadata = index_reader.metadata();
let uid = metadata.uid.clone();
tracing::info!("Importing index `{}`.", metadata.uid);
tracing::info!("Importing index `{uid}`.");
let date = Some((metadata.created_at, metadata.updated_at));
let index = index_scheduler.create_raw_index(&metadata.uid, date)?;
@ -553,6 +555,10 @@ fn import_dump(
apply_settings_to_builder(&settings, &mut builder);
let embedder_stats: Arc<EmbedderStats> = Default::default();
builder.execute(&|| false, &progress, embedder_stats.clone())?;
wtxn.commit()?;
let mut wtxn = index.write_txn()?;
let rtxn = index.read_txn()?;
if index_scheduler.no_edition_2024_for_dumps() {
// 5.3 Import the documents.
@ -594,7 +600,51 @@ fn import_dump(
tracing::info!(documents_found = user_result, "{} documents found.", user_result);
builder.execute()?;
} else {
unimplemented!("new document indexer when importing dumps");
let db_fields_ids_map = index.fields_ids_map(&rtxn)?;
let primary_key = index.primary_key(&rtxn)?;
let mut new_fields_ids_map = db_fields_ids_map.clone();
let mut indexer = indexer::DocumentOperation::new();
let embedders = index.embedding_configs().embedding_configs(&mut wtxn)?;
let embedders = index_scheduler.embedders(uid.clone(), embedders)?;
let mmap = unsafe { memmap2::Mmap::map(index_reader.documents_file())? };
indexer.replace_documents(&mmap)?;
let indexer_config = index_scheduler.indexer_config();
let pool = &indexer_config.thread_pool;
let indexer_alloc = Bump::new();
let (document_changes, mut operation_stats, primary_key) = indexer.into_changes(
&indexer_alloc,
&index,
&rtxn,
primary_key,
&mut new_fields_ids_map,
&|| false, // never stop processing a dump
progress.clone(),
)?;
let operation_stats = operation_stats.pop().unwrap();
if let Some(error) = operation_stats.error {
return Err(error.into());
}
let _congestion = indexer::index(
&mut wtxn,
&index,
pool,
indexer_config.grenad_parameters(),
&db_fields_ids_map,
new_fields_ids_map,
primary_key,
&document_changes,
embedders,
&|| false, // never stop processing a dump
&progress,
&embedder_stats,
)?;
}
wtxn.commit()?;