mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	meilitool: Support dumpless upgrade from v1.9 to v1.10 when there are no REST embedders
This commit is contained in:
		| @@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; | ||||
| use std::io::BufWriter; | ||||
| use std::path::PathBuf; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use anyhow::{bail, Context}; | ||||
| use clap::{Parser, Subcommand}; | ||||
| use dump::{DumpWriter, IndexMetadata}; | ||||
| use file_store::FileStore; | ||||
| @@ -10,9 +10,10 @@ use meilisearch_auth::AuthController; | ||||
| use meilisearch_types::heed::types::{SerdeJson, Str}; | ||||
| use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; | ||||
| use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; | ||||
| use meilisearch_types::milli::index::{db_name, main_key}; | ||||
| use meilisearch_types::milli::{obkv_to_json, BEU32}; | ||||
| use meilisearch_types::tasks::{Status, Task}; | ||||
| use meilisearch_types::versioning::check_version_file; | ||||
| use meilisearch_types::versioning::{create_version_file, get_version, parse_version}; | ||||
| use meilisearch_types::Index; | ||||
| use time::macros::format_description; | ||||
| use time::OffsetDateTime; | ||||
| @@ -62,21 +63,404 @@ enum Command { | ||||
|         #[arg(long)] | ||||
|         skip_enqueued_tasks: bool, | ||||
|     }, | ||||
|  | ||||
|     /// Attempts to upgrade from one major version to the next without a dump. | ||||
|     /// | ||||
|     /// Make sure to run this commmand when Meilisearch is not running! | ||||
|     /// If Meilisearch is running while executing this command, the database could be corrupted | ||||
|     /// (contain data from both the old and the new versions) | ||||
|     /// | ||||
|     /// Supported upgrade paths: | ||||
|     /// | ||||
|     /// - v1.9.0 -> v1.10.0 | ||||
|     OfflineUpgrade { | ||||
|         #[arg(long)] | ||||
|         target_version: String, | ||||
|     }, | ||||
| } | ||||
|  | ||||
| fn main() -> anyhow::Result<()> { | ||||
|     let Cli { db_path, command } = Cli::parse(); | ||||
|  | ||||
|     check_version_file(&db_path).context("While checking the version file")?; | ||||
|     let detected_version = get_version(&db_path).context("While checking the version file")?; | ||||
|  | ||||
|     match command { | ||||
|         Command::ClearTaskQueue => clear_task_queue(db_path), | ||||
|         Command::ExportADump { dump_dir, skip_enqueued_tasks } => { | ||||
|             export_a_dump(db_path, dump_dir, skip_enqueued_tasks) | ||||
|         } | ||||
|         Command::OfflineUpgrade { target_version } => { | ||||
|             let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; | ||||
|             OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct OfflineUpgrade { | ||||
|     db_path: PathBuf, | ||||
|     current_version: (String, String, String), | ||||
|     target_version: (String, String, String), | ||||
| } | ||||
|  | ||||
| impl OfflineUpgrade { | ||||
|     fn upgrade(self) -> anyhow::Result<()> { | ||||
|         // TODO: if we make this process support more versions, introduce a more flexible way of checking for the version | ||||
|         // currently only supports v1.9 to v1.10 | ||||
|         let (current_major, current_minor, current_patch) = &self.current_version; | ||||
|  | ||||
|         match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) { | ||||
|             ("1", "9", _) => {} | ||||
|             _ => { | ||||
|                 bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9") | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         let (target_major, target_minor, target_patch) = &self.target_version; | ||||
|  | ||||
|         match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { | ||||
|             ("1", "10", _) => {} | ||||
|             _ => { | ||||
|                 bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10") | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); | ||||
|  | ||||
|         self.v1_9_to_v1_10()?; | ||||
|  | ||||
|         println!("Writing VERSION file"); | ||||
|  | ||||
|         create_version_file(&self.db_path, target_major, target_minor, target_patch) | ||||
|             .context("while writing VERSION file after the upgrade")?; | ||||
|  | ||||
|         println!("Success"); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn v1_9_to_v1_10(&self) -> anyhow::Result<()> { | ||||
|         // 2 changes here | ||||
|  | ||||
|         // 1. date format. needs to be done before opening the Index | ||||
|         // 2. REST embedders. We don't support this case right now, so bail | ||||
|  | ||||
|         let index_scheduler_path = self.db_path.join("tasks"); | ||||
|         let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } | ||||
|             .with_context(|| { | ||||
|                 format!("While trying to open {:?}", index_scheduler_path.display()) | ||||
|             })?; | ||||
|  | ||||
|         let mut sched_wtxn = env.write_txn()?; | ||||
|  | ||||
|         let index_mapping: Database<Str, UuidCodec> = | ||||
|             try_opening_database(&env, &sched_wtxn, "index-mapping")?; | ||||
|  | ||||
|         let index_stats: Database<UuidCodec, Unspecified> = | ||||
|             try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { | ||||
|                 format!("While trying to open {:?}", index_scheduler_path.display()) | ||||
|             })?; | ||||
|  | ||||
|         let index_count = | ||||
|             index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; | ||||
|  | ||||
|         // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn | ||||
|         // 1. immutably for the iteration | ||||
|         // 2. mutably for updating index stats | ||||
|         let indexes: Vec<_> = index_mapping | ||||
|             .iter(&sched_wtxn)? | ||||
|             .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) | ||||
|             .collect(); | ||||
|         for (index_index, result) in indexes.into_iter().enumerate() { | ||||
|             let (uid, uuid) = result?; | ||||
|             let index_path = self.db_path.join("indexes").join(uuid.to_string()); | ||||
|  | ||||
|             println!( | ||||
|                 "[{index_index}/{index_count}]Updating index {uid} at '{}'", | ||||
|                 index_path.display() | ||||
|             ); | ||||
|  | ||||
|             let index_env = unsafe { | ||||
|                 // FIXME: fetch the 25 magic number from the index file | ||||
|                 EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { | ||||
|                     format!("while opening index {uid} at '{}'", index_path.display()) | ||||
|                 })? | ||||
|             }; | ||||
|  | ||||
|             let mut index_wtxn = index_env.write_txn().with_context(|| { | ||||
|                 format!( | ||||
|                     "while obtaining a write transaction for index {uid} at {}", | ||||
|                     index_path.display() | ||||
|                 ) | ||||
|             })?; | ||||
|  | ||||
|             println!("\tUpdating index stats"); | ||||
|             update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; | ||||
|             println!("\tUpdating date format"); | ||||
|             update_date_format(&uid, &index_env, &mut index_wtxn)?; | ||||
|  | ||||
|             println!("\tChecking for incompatible embedders (REST embedders)"); | ||||
|             check_rest_embedder(&uid, &index_env, &index_wtxn)?; | ||||
|  | ||||
|             index_wtxn.commit().with_context(|| { | ||||
|                 format!( | ||||
|                     "while committing the write txn for index {uid} at {}", | ||||
|                     index_path.display() | ||||
|                 ) | ||||
|             })?; | ||||
|         } | ||||
|  | ||||
|         sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; | ||||
|  | ||||
|         println!("Upgrading database succeeded"); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub mod v1_9 { | ||||
|     pub type FieldDistribution = std::collections::BTreeMap<String, u64>; | ||||
|  | ||||
|     /// The statistics that can be computed from an `Index` object. | ||||
|     #[derive(serde::Serialize, serde::Deserialize, Debug)] | ||||
|     pub struct IndexStats { | ||||
|         /// Number of documents in the index. | ||||
|         pub number_of_documents: u64, | ||||
|         /// Size taken up by the index' DB, in bytes. | ||||
|         /// | ||||
|         /// This includes the size taken by both the used and free pages of the DB, and as the free pages | ||||
|         /// are not returned to the disk after a deletion, this number is typically larger than | ||||
|         /// `used_database_size` that only includes the size of the used pages. | ||||
|         pub database_size: u64, | ||||
|         /// Size taken by the used pages of the index' DB, in bytes. | ||||
|         /// | ||||
|         /// As the DB backend does not return to the disk the pages that are not currently used by the DB, | ||||
|         /// this value is typically smaller than `database_size`. | ||||
|         pub used_database_size: u64, | ||||
|         /// Association of every field name with the number of times it occurs in the documents. | ||||
|         pub field_distribution: FieldDistribution, | ||||
|         /// Creation date of the index. | ||||
|         pub created_at: time::OffsetDateTime, | ||||
|         /// Date of the last update of the index. | ||||
|         pub updated_at: time::OffsetDateTime, | ||||
|     } | ||||
|  | ||||
|     use serde::{Deserialize, Serialize}; | ||||
|  | ||||
|     #[derive(Debug, Deserialize, Serialize)] | ||||
|     pub struct IndexEmbeddingConfig { | ||||
|         pub name: String, | ||||
|         pub config: EmbeddingConfig, | ||||
|     } | ||||
|  | ||||
|     #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] | ||||
|     pub struct EmbeddingConfig { | ||||
|         /// Options of the embedder, specific to each kind of embedder | ||||
|         pub embedder_options: EmbedderOptions, | ||||
|     } | ||||
|  | ||||
|     /// Options of an embedder, specific to each kind of embedder. | ||||
|     #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
|     pub enum EmbedderOptions { | ||||
|         HuggingFace(hf::EmbedderOptions), | ||||
|         OpenAi(openai::EmbedderOptions), | ||||
|         Ollama(ollama::EmbedderOptions), | ||||
|         UserProvided(manual::EmbedderOptions), | ||||
|         Rest(rest::EmbedderOptions), | ||||
|     } | ||||
|  | ||||
|     impl Default for EmbedderOptions { | ||||
|         fn default() -> Self { | ||||
|             Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     mod hf { | ||||
|         #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
|         pub struct EmbedderOptions { | ||||
|             pub model: String, | ||||
|             pub revision: Option<String>, | ||||
|         } | ||||
|     } | ||||
|     mod openai { | ||||
|  | ||||
|         #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
|         pub struct EmbedderOptions { | ||||
|             pub api_key: Option<String>, | ||||
|             pub dimensions: Option<usize>, | ||||
|         } | ||||
|     } | ||||
|     mod ollama { | ||||
|         #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
|         pub struct EmbedderOptions { | ||||
|             pub embedding_model: String, | ||||
|             pub url: Option<String>, | ||||
|             pub api_key: Option<String>, | ||||
|         } | ||||
|     } | ||||
|     mod manual { | ||||
|         #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] | ||||
|         pub struct EmbedderOptions { | ||||
|             pub dimensions: usize, | ||||
|         } | ||||
|     } | ||||
|     mod rest { | ||||
|         #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] | ||||
|         pub struct EmbedderOptions { | ||||
|             pub api_key: Option<String>, | ||||
|             pub dimensions: Option<usize>, | ||||
|             pub url: String, | ||||
|             pub input_field: Vec<String>, | ||||
|             // path to the array of embeddings | ||||
|             pub path_to_embeddings: Vec<String>, | ||||
|             // shape of a single embedding | ||||
|             pub embedding_object: Vec<String>, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub type OffsetDateTime = time::OffsetDateTime; | ||||
| } | ||||
|  | ||||
| pub mod v1_10 { | ||||
|     use crate::v1_9; | ||||
|  | ||||
|     pub type FieldDistribution = std::collections::BTreeMap<String, u64>; | ||||
|  | ||||
|     /// The statistics that can be computed from an `Index` object. | ||||
|     #[derive(serde::Serialize, serde::Deserialize, Debug)] | ||||
|     pub struct IndexStats { | ||||
|         /// Number of documents in the index. | ||||
|         pub number_of_documents: u64, | ||||
|         /// Size taken up by the index' DB, in bytes. | ||||
|         /// | ||||
|         /// This includes the size taken by both the used and free pages of the DB, and as the free pages | ||||
|         /// are not returned to the disk after a deletion, this number is typically larger than | ||||
|         /// `used_database_size` that only includes the size of the used pages. | ||||
|         pub database_size: u64, | ||||
|         /// Size taken by the used pages of the index' DB, in bytes. | ||||
|         /// | ||||
|         /// As the DB backend does not return to the disk the pages that are not currently used by the DB, | ||||
|         /// this value is typically smaller than `database_size`. | ||||
|         pub used_database_size: u64, | ||||
|         /// Association of every field name with the number of times it occurs in the documents. | ||||
|         pub field_distribution: FieldDistribution, | ||||
|         /// Creation date of the index. | ||||
|         #[serde(with = "time::serde::rfc3339")] | ||||
|         pub created_at: time::OffsetDateTime, | ||||
|         /// Date of the last update of the index. | ||||
|         #[serde(with = "time::serde::rfc3339")] | ||||
|         pub updated_at: time::OffsetDateTime, | ||||
|     } | ||||
|  | ||||
|     impl From<v1_9::IndexStats> for IndexStats { | ||||
|         fn from( | ||||
|             v1_9::IndexStats { | ||||
|                 number_of_documents, | ||||
|                 database_size, | ||||
|                 used_database_size, | ||||
|                 field_distribution, | ||||
|                 created_at, | ||||
|                 updated_at, | ||||
|             }: v1_9::IndexStats, | ||||
|         ) -> Self { | ||||
|             IndexStats { | ||||
|                 number_of_documents, | ||||
|                 database_size, | ||||
|                 used_database_size, | ||||
|                 field_distribution, | ||||
|                 created_at, | ||||
|                 updated_at, | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     #[derive(serde::Serialize, serde::Deserialize)] | ||||
|     #[serde(transparent)] | ||||
|     pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); | ||||
| } | ||||
|  | ||||
| fn update_index_stats( | ||||
|     index_stats: Database<UuidCodec, Unspecified>, | ||||
|     index_uid: &str, | ||||
|     index_uuid: uuid::Uuid, | ||||
|     sched_wtxn: &mut RwTxn, | ||||
| ) -> anyhow::Result<()> { | ||||
|     let ctx = || format!("while updating index stats for index {index_uid}"); | ||||
|  | ||||
|     let stats: Option<v1_9::IndexStats> = index_stats | ||||
|         .remap_data_type::<SerdeJson<v1_9::IndexStats>>() | ||||
|         .get(sched_wtxn, &index_uuid) | ||||
|         .with_context(ctx)?; | ||||
|  | ||||
|     if let Some(stats) = stats { | ||||
|         let stats: v1_10::IndexStats = stats.into(); | ||||
|  | ||||
|         index_stats | ||||
|             .remap_data_type::<SerdeJson<v1_10::IndexStats>>() | ||||
|             .put(sched_wtxn, &index_uuid, &stats) | ||||
|             .with_context(ctx)?; | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn update_date_format( | ||||
|     index_uid: &str, | ||||
|     index_env: &Env, | ||||
|     index_wtxn: &mut RwTxn, | ||||
| ) -> anyhow::Result<()> { | ||||
|     let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) | ||||
|         .with_context(|| format!("while updating date format for index {index_uid}"))?; | ||||
|  | ||||
|     date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; | ||||
|     date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn check_rest_embedder(index_uid: &str, index_env: &Env, index_txn: &RoTxn) -> anyhow::Result<()> { | ||||
|     let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) | ||||
|         .with_context(|| format!("while checking REST embedders for index {index_uid}"))?; | ||||
|  | ||||
|     for config in main | ||||
|         .remap_types::<Str, SerdeJson<Vec<v1_9::IndexEmbeddingConfig>>>() | ||||
|         .get(index_txn, main_key::EMBEDDING_CONFIGS)? | ||||
|         .unwrap_or_default() | ||||
|     { | ||||
|         if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { | ||||
|             bail!( | ||||
|                 "index {index_uid} has a REST embedder: {}. \ | ||||
|                 REST embedder are unsupported for upgrade. \ | ||||
|                 Remove the embedder and retry.", | ||||
|                 config.name | ||||
|             ) | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn date_round_trip( | ||||
|     wtxn: &mut RwTxn, | ||||
|     index_uid: &str, | ||||
|     db: Database<Unspecified, Unspecified>, | ||||
|     key: &str, | ||||
| ) -> anyhow::Result<()> { | ||||
|     let datetime = | ||||
|         db.remap_types::<Str, SerdeJson<v1_9::OffsetDateTime>>().get(wtxn, key).with_context( | ||||
|             || format!("could not read `{key}` while updating date format for index {index_uid}"), | ||||
|         )?; | ||||
|  | ||||
|     if let Some(datetime) = datetime { | ||||
|         db.remap_types::<Str, SerdeJson<v1_10::OffsetDateTime>>() | ||||
|             .put(wtxn, key, &v1_10::OffsetDateTime(datetime)) | ||||
|             .with_context(|| { | ||||
|                 format!("could not write `{key}` while updating date format for index {index_uid}") | ||||
|             })?; | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// Clears the task queue located at `db_path`. | ||||
| fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { | ||||
|     let path = db_path.join("tasks"); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user