Introduce a new CLI and env var to use the old document indexer when

importing dumps
This commit is contained in:
Clément Renault
2025-07-17 16:12:23 +02:00
parent fd8b2451d7
commit fe15e11c9d
4 changed files with 73 additions and 37 deletions

View File

@ -139,6 +139,8 @@ pub struct IndexSchedulerOptions {
pub embedding_cache_cap: usize,
/// Snapshot compaction status.
pub experimental_no_snapshot_compaction: bool,
/// Whether dump import use the old document indexer or the new one.
pub experimental_no_edition_2024_for_dumps: bool,
}
/// Structure which holds meilisearch's indexes and schedules the tasks
@ -168,6 +170,9 @@ pub struct IndexScheduler {
/// Whether we should automatically cleanup the task queue or not.
pub(crate) cleanup_enabled: bool,
/// Whether we should use the old document indexer or the new one.
pub(crate) experimental_no_edition_2024_for_dumps: bool,
/// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL.
@ -210,6 +215,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
cleanup_enabled: self.cleanup_enabled,
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
webhook_url: self.webhook_url.clone(),
webhook_authorization_header: self.webhook_authorization_header.clone(),
embedders: self.embedders.clone(),
@ -296,6 +302,7 @@ impl IndexScheduler {
index_mapper,
env,
cleanup_enabled: options.cleanup_enabled,
experimental_no_edition_2024_for_dumps: options.experimental_no_edition_2024_for_dumps,
webhook_url: options.webhook_url,
webhook_authorization_header: options.webhook_authorization_header,
embedders: Default::default(),
@ -594,6 +601,11 @@ impl IndexScheduler {
Ok(nbr_index_processing_tasks > 0)
}
/// Whether the index should use the old document indexer.
pub fn no_edition_2024_for_dumps(&self) -> bool {
self.experimental_no_edition_2024_for_dumps
}
/// Return the tasks matching the query from the user's point of view along
/// with the total number of tasks matching the query, ignoring from and limit.
///

View File

@ -203,6 +203,7 @@ struct Infos {
experimental_composite_embedders: bool,
experimental_embedding_cache_entries: usize,
experimental_no_snapshot_compaction: bool,
experimental_no_edition_2024_for_dumps: bool,
experimental_no_edition_2024_for_settings: bool,
gpu_enabled: bool,
db_path: bool,
@ -253,6 +254,7 @@ impl Infos {
experimental_limit_batched_tasks_total_size,
experimental_embedding_cache_entries,
experimental_no_snapshot_compaction,
experimental_no_edition_2024_for_dumps,
http_addr,
master_key: _,
env,
@ -329,6 +331,7 @@ impl Infos {
experimental_composite_embedders: composite_embedders,
experimental_embedding_cache_entries,
experimental_no_snapshot_compaction,
experimental_no_edition_2024_for_dumps,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(),

View File

@ -238,6 +238,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
auto_upgrade: opt.experimental_dumpless_upgrade,
embedding_cache_cap: opt.experimental_embedding_cache_entries,
experimental_no_snapshot_compaction: opt.experimental_no_snapshot_compaction,
experimental_no_edition_2024_for_dumps: opt.experimental_no_edition_2024_for_dumps,
};
let binary_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH);
@ -553,47 +554,51 @@ fn import_dump(
let embedder_stats: Arc<EmbedderStats> = Default::default();
builder.execute(&|| false, &progress, embedder_stats.clone())?;
// 5.3 Import the documents.
// 5.3.1 We need to recreate the grenad+obkv format accepted by the index.
tracing::info!("Importing the documents.");
let file = tempfile::tempfile()?;
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
for document in index_reader.documents()? {
builder.append_json_object(&document?)?;
if index_scheduler.no_edition_2024_for_dumps() {
// 5.3 Import the documents.
// 5.3.1 We need to recreate the grenad+obkv format accepted by the index.
tracing::info!("Importing the documents.");
let file = tempfile::tempfile()?;
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
for document in index_reader.documents()? {
builder.append_json_object(&document?)?;
}
// This flush the content of the batch builder.
let file = builder.into_inner()?.into_inner()?;
// 5.3.2 We feed it to the milli index.
let reader = BufReader::new(file);
let reader = DocumentsBatchReader::from_reader(reader)?;
let embedder_configs = index.embedding_configs().embedding_configs(&wtxn)?;
let embedders = index_scheduler.embedders(uid.to_string(), embedder_configs)?;
let builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index,
indexer_config,
IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
},
|indexing_step| tracing::trace!("update: {:?}", indexing_step),
|| false,
&embedder_stats,
)?;
let builder = builder.with_embedders(embedders);
let (builder, user_result) = builder.add_documents(reader)?;
let user_result = user_result?;
tracing::info!(documents_found = user_result, "{} documents found.", user_result);
builder.execute()?;
} else {
unimplemented!("new document indexer when importing dumps");
}
// This flush the content of the batch builder.
let file = builder.into_inner()?.into_inner()?;
// 5.3.2 We feed it to the milli index.
let reader = BufReader::new(file);
let reader = DocumentsBatchReader::from_reader(reader)?;
let embedder_configs = index.embedding_configs().embedding_configs(&wtxn)?;
let embedders = index_scheduler.embedders(uid.to_string(), embedder_configs)?;
let builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index,
indexer_config,
IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
},
|indexing_step| tracing::trace!("update: {:?}", indexing_step),
|| false,
&embedder_stats,
)?;
let builder = builder.with_embedders(embedders);
let (builder, user_result) = builder.add_documents(reader)?;
let user_result = user_result?;
tracing::info!(documents_found = user_result, "{} documents found.", user_result);
builder.execute()?;
wtxn.commit()?;
tracing::info!("All documents successfully imported.");
index_scheduler.refresh_index_stats(&uid)?;
}

View File

@ -68,6 +68,8 @@ const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str =
const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str =
"MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES";
const MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION: &str = "MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION";
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS: &str =
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
@ -467,6 +469,15 @@ pub struct Opt {
#[serde(default)]
pub experimental_no_snapshot_compaction: bool,
/// Experimental make dump imports use the old document indexer.
///
/// When enabled, Meilisearch will use the old document indexer when importing dumps.
///
/// For more information, see <https://github.com/orgs/meilisearch/discussions/851>.
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS)]
#[serde(default)]
pub experimental_no_edition_2024_for_dumps: bool,
#[serde(flatten)]
#[clap(flatten)]
pub indexer_options: IndexerOpts,
@ -572,6 +583,7 @@ impl Opt {
experimental_limit_batched_tasks_total_size,
experimental_embedding_cache_entries,
experimental_no_snapshot_compaction,
experimental_no_edition_2024_for_dumps,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -672,6 +684,10 @@ impl Opt {
MEILI_EXPERIMENTAL_NO_SNAPSHOT_COMPACTION,
experimental_no_snapshot_compaction.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_DUMPS,
experimental_no_edition_2024_for_dumps.to_string(),
);
indexer_options.export_to_env();
}