mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 09:56:28 +00:00 
			
		
		
		
	Merge #4970
4970: Create a new export documents meilitool subcommand r=dureuill a=Kerollmops This subcommand can be useful for extracting documents from an existing database. Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
		@@ -1,5 +1,5 @@
 | 
				
			|||||||
use std::fs::{read_dir, read_to_string, remove_file, File};
 | 
					use std::fs::{read_dir, read_to_string, remove_file, File};
 | 
				
			||||||
use std::io::BufWriter;
 | 
					use std::io::{BufWriter, Write as _};
 | 
				
			||||||
use std::path::PathBuf;
 | 
					use std::path::PathBuf;
 | 
				
			||||||
use std::time::Instant;
 | 
					use std::time::Instant;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -12,11 +12,14 @@ use meilisearch_types::heed::types::{SerdeJson, Str};
 | 
				
			|||||||
use meilisearch_types::heed::{
 | 
					use meilisearch_types::heed::{
 | 
				
			||||||
    CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
 | 
					    CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
 | 
					use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
 | 
				
			||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
 | 
					use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
 | 
				
			||||||
 | 
					use meilisearch_types::milli::vector::parsed_vectors::{ExplicitVectors, VectorOrArrayOfVectors};
 | 
				
			||||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
 | 
					use meilisearch_types::milli::{obkv_to_json, BEU32};
 | 
				
			||||||
use meilisearch_types::tasks::{Status, Task};
 | 
					use meilisearch_types::tasks::{Status, Task};
 | 
				
			||||||
use meilisearch_types::versioning::{get_version, parse_version};
 | 
					use meilisearch_types::versioning::{get_version, parse_version};
 | 
				
			||||||
use meilisearch_types::Index;
 | 
					use meilisearch_types::Index;
 | 
				
			||||||
 | 
					use serde_json::Value::Object;
 | 
				
			||||||
use time::macros::format_description;
 | 
					use time::macros::format_description;
 | 
				
			||||||
use time::OffsetDateTime;
 | 
					use time::OffsetDateTime;
 | 
				
			||||||
use upgrade::OfflineUpgrade;
 | 
					use upgrade::OfflineUpgrade;
 | 
				
			||||||
@@ -68,6 +71,24 @@ enum Command {
 | 
				
			|||||||
        skip_enqueued_tasks: bool,
 | 
					        skip_enqueued_tasks: bool,
 | 
				
			||||||
    },
 | 
					    },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    /// Exports the documents of an index in NDJSON format from a Meilisearch index to stdout.
 | 
				
			||||||
 | 
					    ///
 | 
				
			||||||
 | 
					    /// This command can be executed on a running Meilisearch database. However, please note that
 | 
				
			||||||
 | 
					    /// it will maintain a read-only transaction for the duration of the extraction process.
 | 
				
			||||||
 | 
					    ExportDocuments {
 | 
				
			||||||
 | 
					        /// The index name to export the documents from.
 | 
				
			||||||
 | 
					        #[arg(long)]
 | 
				
			||||||
 | 
					        index_name: String,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        /// Do not export vectors with the documents.
 | 
				
			||||||
 | 
					        #[arg(long)]
 | 
				
			||||||
 | 
					        ignore_vectors: bool,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        /// The number of documents to skip.
 | 
				
			||||||
 | 
					        #[arg(long)]
 | 
				
			||||||
 | 
					        offset: Option<usize>,
 | 
				
			||||||
 | 
					    },
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    /// Attempts to upgrade from one major version to the next without a dump.
 | 
					    /// Attempts to upgrade from one major version to the next without a dump.
 | 
				
			||||||
    ///
 | 
					    ///
 | 
				
			||||||
    /// Make sure to run this commmand when Meilisearch is not running!
 | 
					    /// Make sure to run this commmand when Meilisearch is not running!
 | 
				
			||||||
@@ -114,6 +135,9 @@ fn main() -> anyhow::Result<()> {
 | 
				
			|||||||
        Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
 | 
					        Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
 | 
				
			||||||
            export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
 | 
					            export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					        Command::ExportDocuments { index_name, ignore_vectors, offset } => {
 | 
				
			||||||
 | 
					            export_documents(db_path, index_name, ignore_vectors, offset)
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
        Command::OfflineUpgrade { target_version } => {
 | 
					        Command::OfflineUpgrade { target_version } => {
 | 
				
			||||||
            let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
 | 
					            let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
 | 
				
			||||||
            OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
 | 
					            OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
 | 
				
			||||||
@@ -443,3 +467,106 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    bail!("Target index {index_name} not found!")
 | 
					    bail!("Target index {index_name} not found!")
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					fn export_documents(
 | 
				
			||||||
 | 
					    db_path: PathBuf,
 | 
				
			||||||
 | 
					    index_name: String,
 | 
				
			||||||
 | 
					    ignore_vectors: bool,
 | 
				
			||||||
 | 
					    offset: Option<usize>,
 | 
				
			||||||
 | 
					) -> anyhow::Result<()> {
 | 
				
			||||||
 | 
					    let index_scheduler_path = db_path.join("tasks");
 | 
				
			||||||
 | 
					    let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
 | 
				
			||||||
 | 
					        .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    let rtxn = env.read_txn()?;
 | 
				
			||||||
 | 
					    let index_mapping: Database<Str, UuidCodec> =
 | 
				
			||||||
 | 
					        try_opening_database(&env, &rtxn, "index-mapping")?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    for result in index_mapping.iter(&rtxn)? {
 | 
				
			||||||
 | 
					        let (uid, uuid) = result?;
 | 
				
			||||||
 | 
					        if uid == index_name {
 | 
				
			||||||
 | 
					            let index_path = db_path.join("indexes").join(uuid.to_string());
 | 
				
			||||||
 | 
					            let index =
 | 
				
			||||||
 | 
					                Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
 | 
				
			||||||
 | 
					                    format!("While trying to open the index at path {:?}", index_path.display())
 | 
				
			||||||
 | 
					                })?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            let rtxn = index.read_txn()?;
 | 
				
			||||||
 | 
					            let fields_ids_map = index.fields_ids_map(&rtxn)?;
 | 
				
			||||||
 | 
					            let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
 | 
				
			||||||
 | 
					            let embedding_configs = index.embedding_configs(&rtxn)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            if let Some(offset) = offset {
 | 
				
			||||||
 | 
					                eprintln!("Skipping {offset} documents");
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            let mut stdout = BufWriter::new(std::io::stdout());
 | 
				
			||||||
 | 
					            let all_documents = index.documents_ids(&rtxn)?.into_iter().skip(offset.unwrap_or(0));
 | 
				
			||||||
 | 
					            for (i, ret) in index.iter_documents(&rtxn, all_documents)?.enumerate() {
 | 
				
			||||||
 | 
					                let (id, doc) = ret?;
 | 
				
			||||||
 | 
					                let mut document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if i % 10_000 == 0 {
 | 
				
			||||||
 | 
					                    eprintln!("Starting the {}th document", i + offset.unwrap_or(0));
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                if !ignore_vectors {
 | 
				
			||||||
 | 
					                    'inject_vectors: {
 | 
				
			||||||
 | 
					                        let embeddings = index.embeddings(&rtxn, id)?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        if embeddings.is_empty() {
 | 
				
			||||||
 | 
					                            break 'inject_vectors;
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        let vectors = document
 | 
				
			||||||
 | 
					                            .entry(RESERVED_VECTORS_FIELD_NAME)
 | 
				
			||||||
 | 
					                            .or_insert(Object(Default::default()));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        let Object(vectors) = vectors else {
 | 
				
			||||||
 | 
					                            return Err(meilisearch_types::milli::Error::UserError(
 | 
				
			||||||
 | 
					                                meilisearch_types::milli::UserError::InvalidVectorsMapType {
 | 
				
			||||||
 | 
					                                    document_id: {
 | 
				
			||||||
 | 
					                                        if let Ok(Some(Ok(index))) = index
 | 
				
			||||||
 | 
					                                            .external_id_of(&rtxn, std::iter::once(id))
 | 
				
			||||||
 | 
					                                            .map(|it| it.into_iter().next())
 | 
				
			||||||
 | 
					                                        {
 | 
				
			||||||
 | 
					                                            index
 | 
				
			||||||
 | 
					                                        } else {
 | 
				
			||||||
 | 
					                                            format!("internal docid={id}")
 | 
				
			||||||
 | 
					                                        }
 | 
				
			||||||
 | 
					                                    },
 | 
				
			||||||
 | 
					                                    value: vectors.clone(),
 | 
				
			||||||
 | 
					                                },
 | 
				
			||||||
 | 
					                            )
 | 
				
			||||||
 | 
					                            .into());
 | 
				
			||||||
 | 
					                        };
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                        for (embedder_name, embeddings) in embeddings {
 | 
				
			||||||
 | 
					                            let user_provided = embedding_configs
 | 
				
			||||||
 | 
					                                .iter()
 | 
				
			||||||
 | 
					                                .find(|conf| conf.name == embedder_name)
 | 
				
			||||||
 | 
					                                .is_some_and(|conf| conf.user_provided.contains(id));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                            let embeddings = ExplicitVectors {
 | 
				
			||||||
 | 
					                                embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
 | 
				
			||||||
 | 
					                                    embeddings,
 | 
				
			||||||
 | 
					                                )),
 | 
				
			||||||
 | 
					                                regenerate: !user_provided,
 | 
				
			||||||
 | 
					                            };
 | 
				
			||||||
 | 
					                            vectors
 | 
				
			||||||
 | 
					                                .insert(embedder_name, serde_json::to_value(embeddings).unwrap());
 | 
				
			||||||
 | 
					                        }
 | 
				
			||||||
 | 
					                    }
 | 
				
			||||||
 | 
					                }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                serde_json::to_writer(&mut stdout, &document)?;
 | 
				
			||||||
 | 
					            }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            stdout.flush()?;
 | 
				
			||||||
 | 
					        } else {
 | 
				
			||||||
 | 
					            eprintln!("Found index {uid} but it's not the right index...");
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    Ok(())
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user