mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-21 19:16:28 +00:00 
			
		
		
		
	Merge pull request #5731 from meilisearch/chat-route-support-dumps
Export and import chat completions workspace settings in dumps
This commit is contained in:
		| @@ -116,6 +116,15 @@ impl DumpReader { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn chat_completions_settings( | ||||||
|  |         &mut self, | ||||||
|  |     ) -> Result<Box<dyn Iterator<Item = Result<(String, v6::ChatCompletionSettings)>> + '_>> { | ||||||
|  |         match self { | ||||||
|  |             DumpReader::Current(current) => current.chat_completions_settings(), | ||||||
|  |             DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> { |     pub fn features(&self) -> Result<Option<v6::RuntimeTogglableFeatures>> { | ||||||
|         match self { |         match self { | ||||||
|             DumpReader::Current(current) => Ok(current.features()), |             DumpReader::Current(current) => Ok(current.features()), | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | use std::ffi::OsStr; | ||||||
| use std::fs::{self, File}; | use std::fs::{self, File}; | ||||||
| use std::io::{BufRead, BufReader, ErrorKind}; | use std::io::{BufRead, BufReader, ErrorKind}; | ||||||
| use std::path::Path; | use std::path::Path; | ||||||
| @@ -21,6 +22,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked; | |||||||
| pub type Task = crate::TaskDump; | pub type Task = crate::TaskDump; | ||||||
| pub type Batch = meilisearch_types::batches::Batch; | pub type Batch = meilisearch_types::batches::Batch; | ||||||
| pub type Key = meilisearch_types::keys::Key; | pub type Key = meilisearch_types::keys::Key; | ||||||
|  | pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings; | ||||||
| pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; | pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; | ||||||
| pub type Network = meilisearch_types::features::Network; | pub type Network = meilisearch_types::features::Network; | ||||||
|  |  | ||||||
| @@ -192,6 +194,34 @@ impl V6Reader { | |||||||
|         ) |         ) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn chat_completions_settings( | ||||||
|  |         &mut self, | ||||||
|  |     ) -> Result<Box<dyn Iterator<Item = Result<(String, ChatCompletionSettings)>> + '_>> { | ||||||
|  |         let entries = match fs::read_dir(self.dump.path().join("chat-completions-settings")) { | ||||||
|  |             Ok(entries) => entries, | ||||||
|  |             Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Box::new(std::iter::empty())), | ||||||
|  |             Err(e) => return Err(e.into()), | ||||||
|  |         }; | ||||||
|  |         Ok(Box::new( | ||||||
|  |             entries | ||||||
|  |                 .map(|entry| -> Result<Option<_>> { | ||||||
|  |                     let entry = entry?; | ||||||
|  |                     let file_name = entry.file_name(); | ||||||
|  |                     let path = Path::new(&file_name); | ||||||
|  |                     if entry.file_type()?.is_file() && path.extension() == Some(OsStr::new("json")) | ||||||
|  |                     { | ||||||
|  |                         let name = path.file_stem().unwrap().to_str().unwrap().to_string(); | ||||||
|  |                         let file = File::open(entry.path())?; | ||||||
|  |                         let settings = serde_json::from_reader(file)?; | ||||||
|  |                         Ok(Some((name, settings))) | ||||||
|  |                     } else { | ||||||
|  |                         Ok(None) | ||||||
|  |                     } | ||||||
|  |                 }) | ||||||
|  |                 .filter_map(|entry| entry.transpose()), | ||||||
|  |         )) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn features(&self) -> Option<RuntimeTogglableFeatures> { |     pub fn features(&self) -> Option<RuntimeTogglableFeatures> { | ||||||
|         self.features |         self.features | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -5,7 +5,7 @@ use std::path::PathBuf; | |||||||
| use flate2::write::GzEncoder; | use flate2::write::GzEncoder; | ||||||
| use flate2::Compression; | use flate2::Compression; | ||||||
| use meilisearch_types::batches::Batch; | use meilisearch_types::batches::Batch; | ||||||
| use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; | use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures}; | ||||||
| use meilisearch_types::keys::Key; | use meilisearch_types::keys::Key; | ||||||
| use meilisearch_types::settings::{Checked, Settings}; | use meilisearch_types::settings::{Checked, Settings}; | ||||||
| use serde_json::{Map, Value}; | use serde_json::{Map, Value}; | ||||||
| @@ -51,6 +51,10 @@ impl DumpWriter { | |||||||
|         KeyWriter::new(self.dir.path().to_path_buf()) |         KeyWriter::new(self.dir.path().to_path_buf()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn create_chat_completions_settings(&self) -> Result<ChatCompletionsSettingsWriter> { | ||||||
|  |         ChatCompletionsSettingsWriter::new(self.dir.path().join("chat-completions-settings")) | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn create_tasks_queue(&self) -> Result<TaskWriter> { |     pub fn create_tasks_queue(&self) -> Result<TaskWriter> { | ||||||
|         TaskWriter::new(self.dir.path().join("tasks")) |         TaskWriter::new(self.dir.path().join("tasks")) | ||||||
|     } |     } | ||||||
| @@ -104,6 +108,24 @@ impl KeyWriter { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | pub struct ChatCompletionsSettingsWriter { | ||||||
|  |     path: PathBuf, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl ChatCompletionsSettingsWriter { | ||||||
|  |     pub(crate) fn new(path: PathBuf) -> Result<Self> { | ||||||
|  |         std::fs::create_dir(&path)?; | ||||||
|  |         Ok(ChatCompletionsSettingsWriter { path }) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn push_settings(&mut self, name: &str, settings: &ChatCompletionSettings) -> Result<()> { | ||||||
|  |         let mut settings_file = File::create(self.path.join(name).with_extension("json"))?; | ||||||
|  |         serde_json::to_writer(&mut settings_file, &settings)?; | ||||||
|  |         settings_file.flush()?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
| pub struct TaskWriter { | pub struct TaskWriter { | ||||||
|     queue: BufWriter<File>, |     queue: BufWriter<File>, | ||||||
|     update_files: PathBuf, |     update_files: PathBuf, | ||||||
|   | |||||||
| @@ -103,6 +103,7 @@ make_enum_progress! { | |||||||
|     pub enum DumpCreationProgress { |     pub enum DumpCreationProgress { | ||||||
|         StartTheDumpCreation, |         StartTheDumpCreation, | ||||||
|         DumpTheApiKeys, |         DumpTheApiKeys, | ||||||
|  |         DumpTheChatCompletionSettings, | ||||||
|         DumpTheTasks, |         DumpTheTasks, | ||||||
|         DumpTheBatches, |         DumpTheBatches, | ||||||
|         DumpTheIndexes, |         DumpTheIndexes, | ||||||
|   | |||||||
| @@ -43,7 +43,16 @@ impl IndexScheduler { | |||||||
|  |  | ||||||
|         let rtxn = self.env.read_txn()?; |         let rtxn = self.env.read_txn()?; | ||||||
|  |  | ||||||
|         // 2. dump the tasks |         // 2. dump the chat completion settings | ||||||
|  |         // TODO should I skip the export if the chat completion has been disabled? | ||||||
|  |         progress.update_progress(DumpCreationProgress::DumpTheChatCompletionSettings); | ||||||
|  |         let mut dump_chat_completion_settings = dump.create_chat_completions_settings()?; | ||||||
|  |         for result in self.chat_settings.iter(&rtxn)? { | ||||||
|  |             let (name, chat_settings) = result?; | ||||||
|  |             dump_chat_completion_settings.push_settings(name, &chat_settings)?; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // 3. dump the tasks | ||||||
|         progress.update_progress(DumpCreationProgress::DumpTheTasks); |         progress.update_progress(DumpCreationProgress::DumpTheTasks); | ||||||
|         let mut dump_tasks = dump.create_tasks_queue()?; |         let mut dump_tasks = dump.create_tasks_queue()?; | ||||||
|  |  | ||||||
| @@ -81,7 +90,7 @@ impl IndexScheduler { | |||||||
|  |  | ||||||
|             let mut dump_content_file = dump_tasks.push_task(&t.into())?; |             let mut dump_content_file = dump_tasks.push_task(&t.into())?; | ||||||
|  |  | ||||||
|             // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. |             // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. | ||||||
|             if let Some(content_file) = content_file { |             if let Some(content_file) = content_file { | ||||||
|                 if self.scheduler.must_stop_processing.get() { |                 if self.scheduler.must_stop_processing.get() { | ||||||
|                     return Err(Error::AbortedTask); |                     return Err(Error::AbortedTask); | ||||||
| @@ -105,7 +114,7 @@ impl IndexScheduler { | |||||||
|         } |         } | ||||||
|         dump_tasks.flush()?; |         dump_tasks.flush()?; | ||||||
|  |  | ||||||
|         // 3. dump the batches |         // 4. dump the batches | ||||||
|         progress.update_progress(DumpCreationProgress::DumpTheBatches); |         progress.update_progress(DumpCreationProgress::DumpTheBatches); | ||||||
|         let mut dump_batches = dump.create_batches_queue()?; |         let mut dump_batches = dump.create_batches_queue()?; | ||||||
|  |  | ||||||
| @@ -138,7 +147,7 @@ impl IndexScheduler { | |||||||
|         } |         } | ||||||
|         dump_batches.flush()?; |         dump_batches.flush()?; | ||||||
|  |  | ||||||
|         // 4. Dump the indexes |         // 5. Dump the indexes | ||||||
|         progress.update_progress(DumpCreationProgress::DumpTheIndexes); |         progress.update_progress(DumpCreationProgress::DumpTheIndexes); | ||||||
|         let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; |         let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; | ||||||
|         let mut count = 0; |         let mut count = 0; | ||||||
| @@ -175,7 +184,7 @@ impl IndexScheduler { | |||||||
|             let documents = index |             let documents = index | ||||||
|                 .all_documents(&rtxn) |                 .all_documents(&rtxn) | ||||||
|                 .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; |                 .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; | ||||||
|             // 4.1. Dump the documents |             // 5.1. Dump the documents | ||||||
|             for ret in documents { |             for ret in documents { | ||||||
|                 if self.scheduler.must_stop_processing.get() { |                 if self.scheduler.must_stop_processing.get() { | ||||||
|                     return Err(Error::AbortedTask); |                     return Err(Error::AbortedTask); | ||||||
| @@ -233,7 +242,7 @@ impl IndexScheduler { | |||||||
|                 atomic.fetch_add(1, Ordering::Relaxed); |                 atomic.fetch_add(1, Ordering::Relaxed); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             // 4.2. Dump the settings |             // 5.2. Dump the settings | ||||||
|             let settings = meilisearch_types::settings::settings( |             let settings = meilisearch_types::settings::settings( | ||||||
|                 index, |                 index, | ||||||
|                 &rtxn, |                 &rtxn, | ||||||
| @@ -244,7 +253,7 @@ impl IndexScheduler { | |||||||
|             Ok(()) |             Ok(()) | ||||||
|         })?; |         })?; | ||||||
|  |  | ||||||
|         // 5. Dump experimental feature settings |         // 6. Dump experimental feature settings | ||||||
|         progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); |         progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); | ||||||
|         let features = self.features().runtime_features(); |         let features = self.features().runtime_features(); | ||||||
|         dump.create_experimental_features(features)?; |         dump.create_experimental_features(features)?; | ||||||
|   | |||||||
| @@ -498,14 +498,20 @@ fn import_dump( | |||||||
|         keys.push(key); |         keys.push(key); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // 3. Import the runtime features and network |     // 3. Import the `ChatCompletionSettings`s. | ||||||
|  |     for result in dump_reader.chat_completions_settings()? { | ||||||
|  |         let (name, settings) = result?; | ||||||
|  |         index_scheduler.put_chat_settings(&name, &settings)?; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     // 4. Import the runtime features and network | ||||||
|     let features = dump_reader.features()?.unwrap_or_default(); |     let features = dump_reader.features()?.unwrap_or_default(); | ||||||
|     index_scheduler.put_runtime_features(features)?; |     index_scheduler.put_runtime_features(features)?; | ||||||
|  |  | ||||||
|     let network = dump_reader.network()?.cloned().unwrap_or_default(); |     let network = dump_reader.network()?.cloned().unwrap_or_default(); | ||||||
|     index_scheduler.put_network(network)?; |     index_scheduler.put_network(network)?; | ||||||
|  |  | ||||||
|     // 3.1 Use all cpus to process dump if `max_indexing_threads` not configured |     // 4.1 Use all cpus to process dump if `max_indexing_threads` not configured | ||||||
|     let backup_config; |     let backup_config; | ||||||
|     let base_config = index_scheduler.indexer_config(); |     let base_config = index_scheduler.indexer_config(); | ||||||
|  |  | ||||||
| @@ -522,7 +528,7 @@ fn import_dump( | |||||||
|     // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might |     // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might | ||||||
|     // try to process tasks while we're trying to import the indexes. |     // try to process tasks while we're trying to import the indexes. | ||||||
|  |  | ||||||
|     // 4. Import the indexes. |     // 5. Import the indexes. | ||||||
|     for index_reader in dump_reader.indexes()? { |     for index_reader in dump_reader.indexes()? { | ||||||
|         let mut index_reader = index_reader?; |         let mut index_reader = index_reader?; | ||||||
|         let metadata = index_reader.metadata(); |         let metadata = index_reader.metadata(); | ||||||
| @@ -535,20 +541,20 @@ fn import_dump( | |||||||
|         let mut wtxn = index.write_txn()?; |         let mut wtxn = index.write_txn()?; | ||||||
|  |  | ||||||
|         let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); |         let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); | ||||||
|         // 4.1 Import the primary key if there is one. |         // 5.1 Import the primary key if there is one. | ||||||
|         if let Some(ref primary_key) = metadata.primary_key { |         if let Some(ref primary_key) = metadata.primary_key { | ||||||
|             builder.set_primary_key(primary_key.to_string()); |             builder.set_primary_key(primary_key.to_string()); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // 4.2 Import the settings. |         // 5.2 Import the settings. | ||||||
|         tracing::info!("Importing the settings."); |         tracing::info!("Importing the settings."); | ||||||
|         let settings = index_reader.settings()?; |         let settings = index_reader.settings()?; | ||||||
|         apply_settings_to_builder(&settings, &mut builder); |         apply_settings_to_builder(&settings, &mut builder); | ||||||
|         let embedder_stats: Arc<EmbedderStats> = Default::default(); |         let embedder_stats: Arc<EmbedderStats> = Default::default(); | ||||||
|         builder.execute(&|| false, &progress, embedder_stats.clone())?; |         builder.execute(&|| false, &progress, embedder_stats.clone())?; | ||||||
|  |  | ||||||
|         // 4.3 Import the documents. |         // 5.3 Import the documents. | ||||||
|         // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. |         // 5.3.1 We need to recreate the grenad+obkv format accepted by the index. | ||||||
|         tracing::info!("Importing the documents."); |         tracing::info!("Importing the documents."); | ||||||
|         let file = tempfile::tempfile()?; |         let file = tempfile::tempfile()?; | ||||||
|         let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file)); |         let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file)); | ||||||
| @@ -559,7 +565,7 @@ fn import_dump( | |||||||
|         // This flush the content of the batch builder. |         // This flush the content of the batch builder. | ||||||
|         let file = builder.into_inner()?.into_inner()?; |         let file = builder.into_inner()?.into_inner()?; | ||||||
|  |  | ||||||
|         // 4.3.2 We feed it to the milli index. |         // 5.3.2 We feed it to the milli index. | ||||||
|         let reader = BufReader::new(file); |         let reader = BufReader::new(file); | ||||||
|         let reader = DocumentsBatchReader::from_reader(reader)?; |         let reader = DocumentsBatchReader::from_reader(reader)?; | ||||||
|  |  | ||||||
| @@ -591,15 +597,15 @@ fn import_dump( | |||||||
|         index_scheduler.refresh_index_stats(&uid)?; |         index_scheduler.refresh_index_stats(&uid)?; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // 5. Import the queue |     // 6. Import the queue | ||||||
|     let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; |     let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; | ||||||
|     // 5.1. Import the batches |     // 6.1. Import the batches | ||||||
|     for ret in dump_reader.batches()? { |     for ret in dump_reader.batches()? { | ||||||
|         let batch = ret?; |         let batch = ret?; | ||||||
|         index_scheduler_dump.register_dumped_batch(batch)?; |         index_scheduler_dump.register_dumped_batch(batch)?; | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     // 5.2. Import the tasks |     // 6.2. Import the tasks | ||||||
|     for ret in dump_reader.tasks()? { |     for ret in dump_reader.tasks()? { | ||||||
|         let (task, file) = ret?; |         let (task, file) = ret?; | ||||||
|         index_scheduler_dump.register_dumped_task(task, file)?; |         index_scheduler_dump.register_dumped_task(task, file)?; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user