mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 16:06:31 +00:00 
			
		
		
		
	Add support to upgrade to v1.12.3 in meilitool
This commit is contained in:
		
							
								
								
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										1
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -3733,6 +3733,7 @@ dependencies = [ | ||||
|  "indexmap", | ||||
|  "meilisearch-auth", | ||||
|  "meilisearch-types", | ||||
|  "milli", | ||||
|  "serde", | ||||
|  "serde_json", | ||||
|  "tempfile", | ||||
|   | ||||
| @@ -17,6 +17,7 @@ file-store = { path = "../file-store" } | ||||
| indexmap = { version = "2.7.0", features = ["serde"] } | ||||
| meilisearch-auth = { path = "../meilisearch-auth" } | ||||
| meilisearch-types = { path = "../meilisearch-types" } | ||||
| milli = { path = "../milli" } | ||||
| serde = { version = "1.0.217", features = ["derive"] } | ||||
| serde_json = { version = "1.0.135", features = ["preserve_order"] } | ||||
| tempfile = "3.15.0" | ||||
|   | ||||
| @@ -8,7 +8,7 @@ use std::path::{Path, PathBuf}; | ||||
| use anyhow::{bail, Context}; | ||||
| use meilisearch_types::versioning::create_version_file; | ||||
| use v1_10::v1_9_to_v1_10; | ||||
| use v1_12::v1_11_to_v1_12; | ||||
| use v1_12::{v1_11_to_v1_12, v1_12_to_v1_12_3}; | ||||
|  | ||||
| use crate::upgrade::v1_11::v1_10_to_v1_11; | ||||
|  | ||||
| @@ -24,6 +24,7 @@ impl OfflineUpgrade { | ||||
|             (v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"), | ||||
|             (v1_10_to_v1_11, "1", "11", "0"), | ||||
|             (v1_11_to_v1_12, "1", "12", "0"), | ||||
|             (v1_12_to_v1_12_3, "1", "12", "3"), | ||||
|         ]; | ||||
|  | ||||
|         let (current_major, current_minor, current_patch) = &self.current_version; | ||||
| @@ -36,6 +37,7 @@ impl OfflineUpgrade { | ||||
|             ("1", "9", _) => 0, | ||||
|             ("1", "10", _) => 1, | ||||
|             ("1", "11", _) => 2, | ||||
|             ("1", "12", x) if x == "0" || x == "1" || x == "2" => 3, | ||||
|             _ => { | ||||
|                 bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10") | ||||
|             } | ||||
| @@ -46,7 +48,8 @@ impl OfflineUpgrade { | ||||
|         let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { | ||||
|             ("1", "10", _) => 0, | ||||
|             ("1", "11", _) => 1, | ||||
|             ("1", "12", _) => 2, | ||||
|             ("1", "12", x) if x == "0" || x == "1" || x == "2" => 2, | ||||
|             ("1", "12", "3") => 3, | ||||
|             (major, _, _) if major.starts_with('v') => { | ||||
|                 bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.") | ||||
|             } | ||||
|   | ||||
| @@ -1,16 +1,24 @@ | ||||
| //! The breaking changes that happened between the v1.11 and the v1.12 are: | ||||
| //! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900 | ||||
|  | ||||
| use std::borrow::Cow; | ||||
| use std::io::BufWriter; | ||||
| use std::path::Path; | ||||
| use std::sync::atomic::AtomicBool; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use file_store::FileStore; | ||||
| use indexmap::IndexMap; | ||||
| use meilisearch_types::milli::documents::DocumentsBatchReader; | ||||
| use milli::heed::types::Str; | ||||
| use milli::heed::{Database, EnvOpenOptions}; | ||||
| use milli::progress::Step; | ||||
| use serde_json::value::RawValue; | ||||
| use tempfile::NamedTempFile; | ||||
|  | ||||
| use crate::try_opening_database; | ||||
| use crate::uuid_codec::UuidCodec; | ||||
|  | ||||
| pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { | ||||
|     println!("Upgrading from v1.11.0 to v1.12.0"); | ||||
|  | ||||
| @@ -19,6 +27,14 @@ pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> { | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn v1_12_to_v1_12_3(db_path: &Path) -> anyhow::Result<()> { | ||||
|     println!("Upgrading from v1.12.{{0, 1, 2}} to v1.12.3"); | ||||
|  | ||||
|     rebuild_field_distribution(db_path)?; | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// Convert the update files from OBKV to ndjson format. | ||||
| /// | ||||
| /// 1) List all the update files using the file store. | ||||
| @@ -77,3 +93,113 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| /// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3 | ||||
| fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> { | ||||
|     let index_scheduler_path = db_path.join("tasks"); | ||||
|     let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } | ||||
|         .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; | ||||
|  | ||||
|     let sched_rtxn = env.read_txn()?; | ||||
|  | ||||
|     let index_mapping: Database<Str, UuidCodec> = | ||||
|         try_opening_database(&env, &sched_rtxn, "index-mapping")?; | ||||
|  | ||||
|     let index_count = | ||||
|         index_mapping.len(&sched_rtxn).context("while reading the number of indexes")?; | ||||
|  | ||||
|     let progress = milli::progress::Progress::default(); | ||||
|     let finished = AtomicBool::new(false); | ||||
|  | ||||
|     std::thread::scope(|scope| { | ||||
|         let indexes = index_mapping.iter(&sched_rtxn)?; | ||||
|  | ||||
|         let display_progress = std::thread::Builder::new() | ||||
|             .name("display_progress".into()) | ||||
|             .spawn_scoped(scope, || { | ||||
|                 while !finished.load(std::sync::atomic::Ordering::Relaxed) { | ||||
|                     std::thread::sleep(std::time::Duration::from_secs(5)); | ||||
|                     let view = progress.as_progress_view(); | ||||
|                     let Ok(view) = serde_json::to_string(&view) else { | ||||
|                         continue; | ||||
|                     }; | ||||
|                     println!("{view}"); | ||||
|                 } | ||||
|             }) | ||||
|             .unwrap(); | ||||
|  | ||||
|         for (index_index, result) in indexes.enumerate() { | ||||
|             let (uid, uuid) = result?; | ||||
|             progress.update_progress(VariableNameStep::new( | ||||
|                 uid, | ||||
|                 index_index as u32, | ||||
|                 index_count as u32, | ||||
|             )); | ||||
|             let index_path = db_path.join("indexes").join(uuid.to_string()); | ||||
|  | ||||
|             println!( | ||||
|                 "[{}/{index_count}]Updating index `{uid}` at `{}`", | ||||
|                 index_index + 1, | ||||
|                 index_path.display() | ||||
|             ); | ||||
|  | ||||
|             println!("\t- Rebuilding field distribution"); | ||||
|  | ||||
|             let index = | ||||
|                 milli::Index::new(EnvOpenOptions::new(), &index_path).with_context(|| { | ||||
|                     format!("while opening index {uid} at '{}'", index_path.display()) | ||||
|                 })?; | ||||
|  | ||||
|             let mut index_txn = index.write_txn()?; | ||||
|  | ||||
|             milli::update::new::reindex::field_distribution(&index, &mut index_txn, &progress) | ||||
|                 .context("while rebuilding field distribution")?; | ||||
|  | ||||
|             index_txn.commit().context("while committing the write txn for the updated index")?; | ||||
|         } | ||||
|  | ||||
|         sched_rtxn.commit().context("while committing the write txn for the index-scheduler")?; | ||||
|  | ||||
|         finished.store(true, std::sync::atomic::Ordering::Relaxed); | ||||
|  | ||||
|         if let Err(panic) = display_progress.join() { | ||||
|             let msg = match panic.downcast_ref::<&'static str>() { | ||||
|                 Some(s) => *s, | ||||
|                 None => match panic.downcast_ref::<String>() { | ||||
|                     Some(s) => &s[..], | ||||
|                     None => "Box<dyn Any>", | ||||
|                 }, | ||||
|             }; | ||||
|             eprintln!("WARN: the display thread panicked with {msg}"); | ||||
|         } | ||||
|  | ||||
|         println!("Upgrading database succeeded"); | ||||
|         Ok(()) | ||||
|     }) | ||||
| } | ||||
|  | ||||
| pub struct VariableNameStep { | ||||
|     name: String, | ||||
|     current: u32, | ||||
|     total: u32, | ||||
| } | ||||
|  | ||||
| impl VariableNameStep { | ||||
|     pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self { | ||||
|         Self { name: name.into(), current, total } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl Step for VariableNameStep { | ||||
|     fn name(&self) -> Cow<'static, str> { | ||||
|         self.name.clone().into() | ||||
|     } | ||||
|  | ||||
|     fn current(&self) -> u32 { | ||||
|         self.current | ||||
|     } | ||||
|  | ||||
|     fn total(&self) -> u32 { | ||||
|         self.total | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -16,6 +16,7 @@ pub mod indexer; | ||||
| mod merger; | ||||
| mod parallel_iterator_ext; | ||||
| mod ref_cell_ext; | ||||
| pub mod reindex; | ||||
| pub(crate) mod steps; | ||||
| pub(crate) mod thread_local; | ||||
| pub mod vector_document; | ||||
|   | ||||
							
								
								
									
										47
									
								
								crates/milli/src/update/new/reindex.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										47
									
								
								crates/milli/src/update/new/reindex.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,47 @@ | ||||
| use heed::RwTxn; | ||||
|  | ||||
| use super::document::{Document, DocumentFromDb}; | ||||
| use crate::progress::{self, AtomicSubStep, NamedStep, Progress}; | ||||
| use crate::{FieldDistribution, Index, Result}; | ||||
|  | ||||
| pub fn field_distribution(index: &Index, wtxn: &mut RwTxn<'_>, progress: &Progress) -> Result<()> { | ||||
|     let mut distribution = FieldDistribution::new(); | ||||
|  | ||||
|     let document_count = index.number_of_documents(wtxn)?; | ||||
|     let field_id_map = index.fields_ids_map(wtxn)?; | ||||
|  | ||||
|     let (update_document_count, sub_step) = | ||||
|         AtomicSubStep::<progress::Document>::new(document_count as u32); | ||||
|     progress.update_progress(sub_step); | ||||
|  | ||||
|     let docids = index.documents_ids(wtxn)?; | ||||
|  | ||||
|     for docid in docids { | ||||
|         update_document_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); | ||||
|  | ||||
|         let Some(document) = DocumentFromDb::new(docid, wtxn, index, &field_id_map)? else { | ||||
|             continue; | ||||
|         }; | ||||
|         let geo_iter = document.geo_field().transpose().map(|res| res.map(|rv| ("_geo", rv))); | ||||
|         for res in document.iter_top_level_fields().chain(geo_iter) { | ||||
|             let (field_name, _) = res?; | ||||
|             if let Some(count) = distribution.get_mut(field_name) { | ||||
|                 *count += 1; | ||||
|             } else { | ||||
|                 distribution.insert(field_name.to_owned(), 1); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     index.put_field_distribution(wtxn, &distribution)?; | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[derive(Default)] | ||||
| pub struct FieldDistributionIndexProgress; | ||||
|  | ||||
| impl NamedStep for FieldDistributionIndexProgress { | ||||
|     fn name(&self) -> &'static str { | ||||
|         "documents" | ||||
|     } | ||||
| } | ||||
		Reference in New Issue
	
	Block a user