mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	fix backups
* pluralize variable `backup_folder` -> `backups_folder` * change env case `MEILI_backup_folder` -> `MEILI_BACKUPS_FOLDER` * add miliseconds to backup ID to reduce colisions * fix forgoten stats synchronization
This commit is contained in:
		| @@ -26,8 +26,8 @@ impl Deref for Data { | ||||
| pub struct DataInner { | ||||
|     pub db: Arc<Database>, | ||||
|     pub db_path: String, | ||||
|     pub backup_folder: PathBuf, | ||||
|     pub backup_batch_size: usize, | ||||
|     pub dumps_folder: PathBuf, | ||||
|     pub dump_batch_size: usize, | ||||
|     pub api_keys: ApiKeys, | ||||
|     pub server_pid: u32, | ||||
|     pub http_payload_size_limit: usize, | ||||
| @@ -60,8 +60,8 @@ impl ApiKeys { | ||||
| impl Data { | ||||
|     pub fn new(opt: Opt) -> Result<Data, Box<dyn Error>> { | ||||
|         let db_path = opt.db_path.clone(); | ||||
|         let backup_folder = opt.backup_folder.clone(); | ||||
|         let backup_batch_size = opt.backup_batch_size; | ||||
|         let dumps_folder = opt.dumps_folder.clone(); | ||||
|         let dump_batch_size = opt.dump_batch_size; | ||||
|         let server_pid = std::process::id(); | ||||
|  | ||||
|         let db_opt = DatabaseOptions { | ||||
| @@ -84,8 +84,8 @@ impl Data { | ||||
|         let inner_data = DataInner { | ||||
|             db: db.clone(), | ||||
|             db_path, | ||||
|             backup_folder, | ||||
|             backup_batch_size, | ||||
|             dumps_folder, | ||||
|             dump_batch_size, | ||||
|             api_keys, | ||||
|             server_pid, | ||||
|             http_payload_size_limit, | ||||
|   | ||||
| @@ -7,7 +7,7 @@ use std::thread; | ||||
| use actix_web::web; | ||||
| use chrono::offset::Utc; | ||||
| use indexmap::IndexMap; | ||||
| use log::error; | ||||
| use log::{error, info}; | ||||
| use meilisearch_core::{MainWriter, MainReader, UpdateReader}; | ||||
| use meilisearch_core::settings::Settings; | ||||
| use meilisearch_core::update::{apply_settings_update, apply_documents_addition}; | ||||
| @@ -21,37 +21,37 @@ use crate::helpers::compression; | ||||
| use crate::routes::index; | ||||
| use crate::routes::index::IndexResponse; | ||||
| 
 | ||||
| // Mutex to share backup progress.
 | ||||
| static BACKUP_INFO: Lazy<Mutex<Option<BackupInfo>>> = Lazy::new(Mutex::default); | ||||
| // Mutex to share dump progress.
 | ||||
| static DUMP_INFO: Lazy<Mutex<Option<DumpInfo>>> = Lazy::new(Mutex::default); | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize, Copy, Clone)] | ||||
| enum BackupVersion { | ||||
| enum DumpVersion { | ||||
|     V1, | ||||
| } | ||||
| 
 | ||||
| impl BackupVersion { | ||||
| impl DumpVersion { | ||||
|     const CURRENT: Self = Self::V1; | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct BackupMetadata { | ||||
| pub struct DumpMetadata { | ||||
|     indexes: Vec<crate::routes::index::IndexResponse>, | ||||
|     db_version: String, | ||||
|     backup_version: BackupVersion, | ||||
|     dump_version: DumpVersion, | ||||
| } | ||||
| 
 | ||||
| impl BackupMetadata { | ||||
|     /// Create a BackupMetadata with the current backup version of meilisearch.
 | ||||
| impl DumpMetadata { | ||||
|     /// Create a DumpMetadata with the current dump version of meilisearch.
 | ||||
|     pub fn new(indexes: Vec<crate::routes::index::IndexResponse>, db_version: String) -> Self { | ||||
|         BackupMetadata { | ||||
|         DumpMetadata { | ||||
|             indexes, | ||||
|             db_version, | ||||
|             backup_version: BackupVersion::CURRENT, | ||||
|             dump_version: DumpVersion::CURRENT, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /// Extract BackupMetadata from `metadata.json` file present at provided `folder_path`
 | ||||
|     /// Extract DumpMetadata from `metadata.json` file present at provided `folder_path`
 | ||||
|     fn from_path(folder_path: &Path) -> Result<Self, Error> { | ||||
|         let path = folder_path.join("metadata.json"); | ||||
|         let file = File::open(path)?; | ||||
| @@ -61,7 +61,7 @@ impl BackupMetadata { | ||||
|         Ok(metadata) | ||||
|     } | ||||
| 
 | ||||
|     /// Write BackupMetadata in `metadata.json` file at provided `folder_path`
 | ||||
|     /// Write DumpMetadata in `metadata.json` file at provided `folder_path`
 | ||||
|     fn to_path(&self, folder_path: &Path) -> Result<(), Error> { | ||||
|         let path = folder_path.join("metadata.json"); | ||||
|         let file = File::create(path)?; | ||||
| @@ -92,10 +92,10 @@ fn settings_to_path(settings: &Settings, folder_path: &Path) -> Result<(), Error | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Import settings and documents of a backup with version `BackupVersion::V1` in specified index.
 | ||||
| /// Import settings and documents of a dump with version `DumpVersion::V1` in specified index.
 | ||||
| fn import_index_v1( | ||||
|     data: &Data, | ||||
|     backup_folder: &Path, | ||||
|     dumps_folder: &Path, | ||||
|     index_uid: &str, | ||||
|     document_batch_size: usize, | ||||
|     write_txn: &mut MainWriter, | ||||
| @@ -107,12 +107,12 @@ fn import_index_v1( | ||||
|         .open_index(index_uid) | ||||
|         .ok_or(Error::index_not_found(index_uid))?; | ||||
| 
 | ||||
|     // index folder path in  backup folder
 | ||||
|     let index_path = &backup_folder.join(index_uid); | ||||
|     // index folder path in  dump folder
 | ||||
|     let index_path = &dumps_folder.join(index_uid); | ||||
| 
 | ||||
|     // extract `settings.json` file and import content
 | ||||
|     let settings = settings_from_path(&index_path)?; | ||||
|     let settings = settings.to_update().or_else(|_e| Err(Error::backup_failed()))?; | ||||
|     let settings = settings.to_update().or_else(|_e| Err(Error::dump_failed()))?; | ||||
|     apply_settings_update(write_txn, &index, settings)?; | ||||
| 
 | ||||
|     // create iterator over documents in `documents.jsonl` to make batch importation
 | ||||
| @@ -143,28 +143,35 @@ fn import_index_v1( | ||||
|         apply_documents_addition(write_txn, &index, values)?; | ||||
|     } | ||||
| 
 | ||||
|     // sync index information: stats, updated_at, last_update
 | ||||
|     if let Err(e) = crate::index_update_callback_txn(index, index_uid, data, write_txn) { | ||||
|         return Err(Error::Internal(e)); | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Import backup from `backup_folder` in database.
 | ||||
| pub fn import_backup( | ||||
| /// Import dump from `dump_path` in database.
 | ||||
| pub fn import_dump( | ||||
|     data: &Data, | ||||
|     backup_folder: &Path, | ||||
|     dump_path: &Path, | ||||
|     document_batch_size: usize, | ||||
| ) -> Result<(), Error> { | ||||
|     info!("Importing dump from {:?}...", dump_path); | ||||
| 
 | ||||
|     // create a temporary directory
 | ||||
|     let tmp_dir = TempDir::new()?; | ||||
|     let tmp_dir_path = tmp_dir.path(); | ||||
| 
 | ||||
|     // extract backup in temporary directory
 | ||||
|     compression::from_tar_gz(backup_folder, tmp_dir_path)?; | ||||
|     // extract dump in temporary directory
 | ||||
|     compression::from_tar_gz(dump_path, tmp_dir_path)?; | ||||
| 
 | ||||
|     // read backup metadata
 | ||||
|     let metadata = BackupMetadata::from_path(&tmp_dir_path)?; | ||||
|     // read dump metadata
 | ||||
|     let metadata = DumpMetadata::from_path(&tmp_dir_path)?; | ||||
| 
 | ||||
|     // choose importation function from BackupVersion of metadata
 | ||||
|     let import_index = match metadata.backup_version { | ||||
|         BackupVersion::V1 => import_index_v1, | ||||
|     // choose importation function from DumpVersion of metadata
 | ||||
|     let import_index = match metadata.dump_version { | ||||
|         DumpVersion::V1 => import_index_v1, | ||||
|     }; | ||||
| 
 | ||||
|     // remove indexes which have same `uid` than indexes to import and create empty indexes
 | ||||
| @@ -184,78 +191,79 @@ pub fn import_backup( | ||||
|         Ok(()) | ||||
|     })?; | ||||
| 
 | ||||
|     info!("Dump importation from {:?} succeed", dump_path); | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] | ||||
| #[serde(rename_all = "snake_case")] | ||||
| pub enum BackupStatus { | ||||
| pub enum DumpStatus { | ||||
|     Done, | ||||
|     Processing, | ||||
|     BackupProcessFailed, | ||||
|     DumpProcessFailed, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Serialize, Deserialize, Clone)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct BackupInfo { | ||||
| pub struct DumpInfo { | ||||
|     pub uid: String, | ||||
|     pub status: BackupStatus, | ||||
|     pub status: DumpStatus, | ||||
|     #[serde(skip_serializing_if = "Option::is_none")] | ||||
|     pub error: Option<String>, | ||||
| } | ||||
| 
 | ||||
| impl BackupInfo { | ||||
|     pub fn new(uid: String, status: BackupStatus) -> Self { | ||||
| impl DumpInfo { | ||||
|     pub fn new(uid: String, status: DumpStatus) -> Self { | ||||
|         Self { uid, status, error: None } | ||||
|     } | ||||
| 
 | ||||
|     pub fn with_error(mut self, error: String) -> Self { | ||||
|         self.status = BackupStatus::BackupProcessFailed; | ||||
|         self.status = DumpStatus::DumpProcessFailed; | ||||
|         self.error = Some(error); | ||||
| 
 | ||||
|         self | ||||
|     } | ||||
| 
 | ||||
|     pub fn backup_already_in_progress(&self) -> bool { | ||||
|         self.status == BackupStatus::Processing | ||||
|     pub fn dump_already_in_progress(&self) -> bool { | ||||
|         self.status == DumpStatus::Processing | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_current() -> Option<Self> { | ||||
|         BACKUP_INFO.lock().unwrap().clone() | ||||
|         DUMP_INFO.lock().unwrap().clone() | ||||
|     } | ||||
| 
 | ||||
|     pub fn set_current(&self) { | ||||
|         *BACKUP_INFO.lock().unwrap() = Some(self.clone()); | ||||
|         *DUMP_INFO.lock().unwrap() = Some(self.clone()); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Generate uid from creation date
 | ||||
| fn generate_uid() -> String { | ||||
|     Utc::now().format("%Y%m%d-%H%M%S").to_string() | ||||
|     Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() | ||||
| } | ||||
| 
 | ||||
| /// Infer backup_folder from backup_uid
 | ||||
| pub fn compressed_backup_folder(backup_folder: &Path, backup_uid: &str) -> PathBuf { | ||||
|     backup_folder.join(format!("{}.tar.gz", backup_uid)) | ||||
| /// Infer dumps_folder from dump_uid
 | ||||
| pub fn compressed_dumps_folder(dumps_folder: &Path, dump_uid: &str) -> PathBuf { | ||||
|     dumps_folder.join(format!("{}.tar.gz", dump_uid)) | ||||
| } | ||||
| 
 | ||||
| /// Write metadata in backup
 | ||||
| fn backup_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> { | ||||
| /// Write metadata in dump
 | ||||
| fn dump_metadata(data: &web::Data<Data>, folder_path: &Path, indexes: Vec<IndexResponse>) -> Result<(), Error> { | ||||
|     let (db_major, db_minor, db_patch) = data.db.version(); | ||||
|     let metadata = BackupMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch)); | ||||
|     let metadata = DumpMetadata::new(indexes, format!("{}.{}.{}", db_major, db_minor, db_patch)); | ||||
| 
 | ||||
|     metadata.to_path(folder_path) | ||||
| } | ||||
| 
 | ||||
| /// Export settings of provided index in backup
 | ||||
| fn backup_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
| /// Export settings of provided index in dump
 | ||||
| fn dump_index_settings(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
|     let settings = crate::routes::setting::get_all_sync(data, reader, index_uid)?; | ||||
| 
 | ||||
|     settings_to_path(&settings, folder_path) | ||||
| } | ||||
| 
 | ||||
| /// Export updates of provided index in backup
 | ||||
| fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
| /// Export updates of provided index in dump
 | ||||
| fn dump_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
|     let updates_path = folder_path.join("updates.jsonl"); | ||||
|     let updates = crate::routes::index::get_all_updates_status_sync(data, reader, index_uid)?; | ||||
| 
 | ||||
| @@ -269,16 +277,16 @@ fn backup_index_updates(data: &web::Data<Data>, reader: &UpdateReader, folder_pa | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| /// Export documents of provided index in backup
 | ||||
| fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
| /// Export documents of provided index in dump
 | ||||
| fn dump_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_path: &Path, index_uid: &str) -> Result<(), Error> { | ||||
|     let documents_path = folder_path.join("documents.jsonl"); | ||||
|     let file = File::create(documents_path)?; | ||||
|     let backup_batch_size = data.backup_batch_size; | ||||
|     let dump_batch_size = data.dump_batch_size; | ||||
| 
 | ||||
|     let mut offset = 0; | ||||
|     loop { | ||||
|         let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, backup_batch_size, None)?; | ||||
|         if documents.len() == 0 { break; } else { offset += backup_batch_size; } | ||||
|         let documents = crate::routes::document::get_all_documents_sync(data, reader, index_uid, offset, dump_batch_size, None)?; | ||||
|         if documents.len() == 0 { break; } else { offset += dump_batch_size; } | ||||
| 
 | ||||
|         for document in documents { | ||||
|             serde_json::to_writer(&file, &document)?; | ||||
| @@ -290,20 +298,20 @@ fn backup_index_documents(data: &web::Data<Data>, reader: &MainReader, folder_pa | ||||
| } | ||||
| 
 | ||||
| /// Write error with a context.
 | ||||
| fn fail_backup_process<E: std::error::Error>(backup_info: BackupInfo, context: &str, error: E) { | ||||
|         let error = format!("Something went wrong during backup process: {}; {}", context, error); | ||||
| fn fail_dump_process<E: std::error::Error>(dump_info: DumpInfo, context: &str, error: E) { | ||||
|         let error = format!("Something went wrong during dump process: {}; {}", context, error); | ||||
|         
 | ||||
|         error!("{}", &error); | ||||
|         backup_info.with_error(error).set_current(); | ||||
|         dump_info.with_error(error).set_current(); | ||||
| } | ||||
| 
 | ||||
| /// Main function of backup.
 | ||||
| fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: BackupInfo) { | ||||
| /// Main function of dump.
 | ||||
| fn dump_process(data: web::Data<Data>, dumps_folder: PathBuf, dump_info: DumpInfo) { | ||||
|     // open read transaction on Update
 | ||||
|     let update_reader = match data.db.update_read_txn() { | ||||
|         Ok(r) => r, | ||||
|         Err(e) => { | ||||
|             fail_backup_process(backup_info, "creating RO transaction on updates", e); | ||||
|             fail_dump_process(dump_info, "creating RO transaction on updates", e); | ||||
|             return ; | ||||
|         } | ||||
|     }; | ||||
| @@ -312,7 +320,7 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba | ||||
|     let main_reader = match data.db.main_read_txn() { | ||||
|         Ok(r) => r, | ||||
|         Err(e) => { | ||||
|             fail_backup_process(backup_info, "creating RO transaction on main", e); | ||||
|             fail_dump_process(dump_info, "creating RO transaction on main", e); | ||||
|             return ; | ||||
|         } | ||||
|     }; | ||||
| @@ -321,7 +329,7 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba | ||||
|     let tmp_dir = match TempDir::new() { | ||||
|         Ok(tmp_dir) => tmp_dir, | ||||
|         Err(e) => { | ||||
|             fail_backup_process(backup_info, "creating temporary directory", e); | ||||
|             fail_dump_process(dump_info, "creating temporary directory", e); | ||||
|             return ; | ||||
|         } | ||||
|     }; | ||||
| @@ -331,14 +339,14 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba | ||||
|     let indexes = match crate::routes::index::list_indexes_sync(&data, &main_reader) { | ||||
|         Ok(indexes) => indexes, | ||||
|         Err(e) => { | ||||
|             fail_backup_process(backup_info, "listing indexes", e); | ||||
|             fail_dump_process(dump_info, "listing indexes", e); | ||||
|             return ; | ||||
|         } | ||||
|     }; | ||||
| 
 | ||||
|     // create metadata
 | ||||
|     if let Err(e) = backup_metadata(&data, &tmp_dir_path, indexes.clone()) { | ||||
|         fail_backup_process(backup_info, "generating metadata", e); | ||||
|     if let Err(e) = dump_metadata(&data, &tmp_dir_path, indexes.clone()) { | ||||
|         fail_dump_process(dump_info, "generating metadata", e); | ||||
|         return ; | ||||
|     } | ||||
| 
 | ||||
| @@ -348,68 +356,68 @@ fn backup_process(data: web::Data<Data>, backup_folder: PathBuf, backup_info: Ba | ||||
| 
 | ||||
|         // create index sub-dircetory
 | ||||
|         if let Err(e) = create_dir_all(&index_path) { | ||||
|             fail_backup_process(backup_info, &format!("creating directory for index {}", &index.uid), e); | ||||
|             fail_dump_process(dump_info, &format!("creating directory for index {}", &index.uid), e); | ||||
|             return ; | ||||
|         } | ||||
| 
 | ||||
|         // export settings
 | ||||
|         if let Err(e) = backup_index_settings(&data, &main_reader, &index_path, &index.uid) { | ||||
|             fail_backup_process(backup_info, &format!("generating settings for index {}", &index.uid), e); | ||||
|         if let Err(e) = dump_index_settings(&data, &main_reader, &index_path, &index.uid) { | ||||
|             fail_dump_process(dump_info, &format!("generating settings for index {}", &index.uid), e); | ||||
|             return ; | ||||
|         } | ||||
| 
 | ||||
|         // export documents
 | ||||
|         if let Err(e) = backup_index_documents(&data, &main_reader, &index_path, &index.uid) { | ||||
|             fail_backup_process(backup_info, &format!("generating documents for index {}", &index.uid), e); | ||||
|         if let Err(e) = dump_index_documents(&data, &main_reader, &index_path, &index.uid) { | ||||
|             fail_dump_process(dump_info, &format!("generating documents for index {}", &index.uid), e); | ||||
|             return ; | ||||
|         } | ||||
| 
 | ||||
|         // export updates
 | ||||
|         if let Err(e) = backup_index_updates(&data, &update_reader, &index_path, &index.uid) { | ||||
|             fail_backup_process(backup_info, &format!("generating updates for index {}", &index.uid), e); | ||||
|         if let Err(e) = dump_index_updates(&data, &update_reader, &index_path, &index.uid) { | ||||
|             fail_dump_process(dump_info, &format!("generating updates for index {}", &index.uid), e); | ||||
|             return ; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // compress backup in a file named `{backup_uid}.tar.gz` in `backup_folder`
 | ||||
|     if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_backup_folder(&backup_folder, &backup_info.uid)) { | ||||
|         fail_backup_process(backup_info, "compressing backup", e); | ||||
|     // compress dump in a file named `{dump_uid}.tar.gz` in `dumps_folder`
 | ||||
|     if let Err(e) = crate::helpers::compression::to_tar_gz(&tmp_dir_path, &compressed_dumps_folder(&dumps_folder, &dump_info.uid)) { | ||||
|         fail_dump_process(dump_info, "compressing dump", e); | ||||
|         return ; | ||||
|     } | ||||
| 
 | ||||
|     // update backup info to `done`
 | ||||
|     let resume = BackupInfo::new( | ||||
|         backup_info.uid, | ||||
|         BackupStatus::Done | ||||
|     // update dump info to `done`
 | ||||
|     let resume = DumpInfo::new( | ||||
|         dump_info.uid, | ||||
|         DumpStatus::Done | ||||
|     ); | ||||
| 
 | ||||
|     resume.set_current(); | ||||
| } | ||||
| 
 | ||||
| pub fn init_backup_process(data: &web::Data<Data>, backup_folder: &Path) -> Result<BackupInfo, Error> { | ||||
|     create_dir_all(backup_folder).or(Err(Error::backup_failed()))?; | ||||
| pub fn init_dump_process(data: &web::Data<Data>, dumps_folder: &Path) -> Result<DumpInfo, Error> { | ||||
|     create_dir_all(dumps_folder).or(Err(Error::dump_failed()))?; | ||||
| 
 | ||||
|     // check if a backup is already in progress
 | ||||
|     if let Some(resume) = BackupInfo::get_current() { | ||||
|         if resume.backup_already_in_progress() { | ||||
|             return Err(Error::backup_conflict()) | ||||
|     // check if a dump is already in progress
 | ||||
|     if let Some(resume) = DumpInfo::get_current() { | ||||
|         if resume.dump_already_in_progress() { | ||||
|             return Err(Error::dump_conflict()) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     // generate a new backup info
 | ||||
|     let info = BackupInfo::new( | ||||
|     // generate a new dump info
 | ||||
|     let info = DumpInfo::new( | ||||
|         generate_uid(), | ||||
|         BackupStatus::Processing | ||||
|         DumpStatus::Processing | ||||
|     ); | ||||
| 
 | ||||
|     info.set_current(); | ||||
| 
 | ||||
|     let data = data.clone(); | ||||
|     let backup_folder = backup_folder.to_path_buf(); | ||||
|     let dumps_folder = dumps_folder.to_path_buf(); | ||||
|     let info_cloned = info.clone(); | ||||
|     // run backup process in a new thread
 | ||||
|     // run dump process in a new thread
 | ||||
|     thread::spawn(move || 
 | ||||
|         backup_process(data, backup_folder, info_cloned) | ||||
|         dump_process(data, dumps_folder, info_cloned) | ||||
|     ); | ||||
| 
 | ||||
|     Ok(info) | ||||
| @@ -53,8 +53,8 @@ pub enum Error { | ||||
|     SearchDocuments(String), | ||||
|     PayloadTooLarge, | ||||
|     UnsupportedMediaType, | ||||
|     BackupAlreadyInProgress, | ||||
|     BackupProcessFailed, | ||||
|     DumpAlreadyInProgress, | ||||
|     DumpProcessFailed, | ||||
| } | ||||
|  | ||||
| impl error::Error for Error {} | ||||
| @@ -80,8 +80,8 @@ impl ErrorCode for Error { | ||||
|             SearchDocuments(_) => Code::SearchDocuments, | ||||
|             PayloadTooLarge => Code::PayloadTooLarge, | ||||
|             UnsupportedMediaType => Code::UnsupportedMediaType, | ||||
|             BackupAlreadyInProgress => Code::BackupAlreadyInProgress, | ||||
|             BackupProcessFailed => Code::BackupProcessFailed, | ||||
|             DumpAlreadyInProgress => Code::DumpAlreadyInProgress, | ||||
|             DumpProcessFailed => Code::DumpProcessFailed, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -185,12 +185,12 @@ impl Error { | ||||
|         Error::SearchDocuments(err.to_string()) | ||||
|     } | ||||
|  | ||||
|     pub fn backup_conflict() -> Error { | ||||
|         Error::BackupAlreadyInProgress | ||||
|     pub fn dump_conflict() -> Error { | ||||
|         Error::DumpAlreadyInProgress | ||||
|     } | ||||
|  | ||||
|     pub fn backup_failed() -> Error { | ||||
|         Error::BackupProcessFailed | ||||
|     pub fn dump_failed() -> Error { | ||||
|         Error::DumpProcessFailed | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -214,8 +214,8 @@ impl fmt::Display for Error { | ||||
|             Self::SearchDocuments(err) => write!(f, "Impossible to search documents; {}", err), | ||||
|             Self::PayloadTooLarge => f.write_str("Payload too large"), | ||||
|             Self::UnsupportedMediaType => f.write_str("Unsupported media type"), | ||||
|             Self::BackupAlreadyInProgress => f.write_str("Another backup is already in progress"), | ||||
|             Self::BackupProcessFailed => f.write_str("Backup process failed"), | ||||
|             Self::DumpAlreadyInProgress => f.write_str("Another dump is already in progress"), | ||||
|             Self::DumpProcessFailed => f.write_str("Dump process failed"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -8,7 +8,7 @@ pub mod option; | ||||
| pub mod routes; | ||||
| pub mod analytics; | ||||
| pub mod snapshot; | ||||
| pub mod backup; | ||||
| pub mod dump; | ||||
|  | ||||
| use actix_http::Error; | ||||
| use actix_service::ServiceFactory; | ||||
| @@ -16,7 +16,7 @@ use actix_web::{dev, web, App}; | ||||
| use chrono::Utc; | ||||
| use log::error; | ||||
|  | ||||
| use meilisearch_core::ProcessedUpdateResult; | ||||
| use meilisearch_core::{Index, MainWriter, ProcessedUpdateResult}; | ||||
|  | ||||
| pub use option::Opt; | ||||
| pub use self::data::Data; | ||||
| @@ -57,7 +57,23 @@ pub fn create_app( | ||||
|         .configure(routes::health::services) | ||||
|         .configure(routes::stats::services) | ||||
|         .configure(routes::key::services) | ||||
|         .configure(routes::backup::services) | ||||
|         .configure(routes::dump::services) | ||||
| } | ||||
|  | ||||
| pub fn index_update_callback_txn(index: Index, index_uid: &str, data: &Data, mut writer: &mut MainWriter) -> Result<(), String> { | ||||
|     if let Err(e) = data.db.compute_stats(&mut writer, index_uid) { | ||||
|         return Err(format!("Impossible to compute stats; {}", e)); | ||||
|     } | ||||
|  | ||||
|     if let Err(e) = data.db.set_last_update(&mut writer, &Utc::now()) { | ||||
|         return Err(format!("Impossible to update last_update; {}", e)); | ||||
|     } | ||||
|  | ||||
|     if let Err(e) = index.main.put_updated_at(&mut writer) { | ||||
|         return Err(format!("Impossible to update updated_at; {}", e)); | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpdateResult) { | ||||
| @@ -65,20 +81,13 @@ pub fn index_update_callback(index_uid: &str, data: &Data, status: ProcessedUpda | ||||
|         return; | ||||
|     } | ||||
|  | ||||
|     if let Some(index) = data.db.open_index(&index_uid) { | ||||
|     if let Some(index) = data.db.open_index(index_uid) { | ||||
|         let db = &data.db; | ||||
|         let res = db.main_write::<_, _, ResponseError>(|mut writer| { | ||||
|             if let Err(e) = data.db.compute_stats(&mut writer, &index_uid) { | ||||
|                 error!("Impossible to compute stats; {}", e) | ||||
|             if let Err(e) = index_update_callback_txn(index, index_uid, data, &mut writer) { | ||||
|                 error!("{}", e); | ||||
|             } | ||||
|  | ||||
|             if let Err(e) = data.db.set_last_update(&mut writer, &Utc::now()) { | ||||
|                 error!("Impossible to update last_update; {}", e) | ||||
|             } | ||||
|  | ||||
|             if let Err(e) = index.main.put_updated_at(&mut writer) { | ||||
|                 error!("Impossible to update updated_at; {}", e) | ||||
|             } | ||||
|             Ok(()) | ||||
|         }); | ||||
|         match res { | ||||
|   | ||||
| @@ -6,7 +6,7 @@ use main_error::MainError; | ||||
| use meilisearch_http::helpers::NormalizePath; | ||||
| use meilisearch_http::{create_app, index_update_callback, Data, Opt}; | ||||
| use structopt::StructOpt; | ||||
| use meilisearch_http::{snapshot, backup}; | ||||
| use meilisearch_http::{snapshot, dump}; | ||||
|  | ||||
| mod analytics; | ||||
|  | ||||
| @@ -70,8 +70,8 @@ async fn main() -> Result<(), MainError> { | ||||
|     })); | ||||
|  | ||||
|  | ||||
|     if let Some(path) = &opt.import_backup { | ||||
|         backup::import_backup(&data, path, opt.backup_batch_size)?; | ||||
|     if let Some(path) = &opt.import_dump { | ||||
|         dump::import_dump(&data, path, opt.dump_batch_size)?; | ||||
|     } | ||||
|  | ||||
|     if let Some(path) = &opt.snapshot_path { | ||||
|   | ||||
| @@ -116,17 +116,17 @@ pub struct Opt { | ||||
|     #[structopt(long, requires = "snapshot-path", env = "MEILI_SNAPSHOT_INTERVAL_SEC")] | ||||
|     pub snapshot_interval_sec: Option<u64>, | ||||
|  | ||||
|     /// Folder where backups are created when the backup route is called. | ||||
|     #[structopt(long, env = "MEILI_backup_folder", default_value = "backups/")] | ||||
|     pub backup_folder: PathBuf, | ||||
|     /// Folder where dumps are created when the dump route is called. | ||||
|     #[structopt(long, env = "MEILI_DUMPS_FOLDER", default_value = "dumps/")] | ||||
|     pub dumps_folder: PathBuf, | ||||
|  | ||||
|     /// Import a backup from the specified path, must be a `.tar.gz` file. | ||||
|     #[structopt(long, env = "MEILI_IMPORT_BACKUP", conflicts_with = "load-from-snapshot")] | ||||
|     pub import_backup: Option<PathBuf>, | ||||
|     /// Import a dump from the specified path, must be a `.tar.gz` file. | ||||
|     #[structopt(long, env = "MEILI_IMPORT_DUMP", conflicts_with = "load-from-snapshot")] | ||||
|     pub import_dump: Option<PathBuf>, | ||||
|  | ||||
|     /// The batch size used in the importation process, the bigger it is the faster the backup is created. | ||||
|     #[structopt(long, env = "MEILI_BACKUP_BATCH_SIZE", default_value = "1024")] | ||||
|     pub backup_batch_size: usize, | ||||
|     /// The batch size used in the importation process, the bigger it is the faster the dump is created. | ||||
|     #[structopt(long, env = "MEILI_DUMP_BATCH_SIZE", default_value = "1024")] | ||||
|     pub dump_batch_size: usize, | ||||
| } | ||||
|  | ||||
| impl Opt { | ||||
|   | ||||
| @@ -1,64 +0,0 @@ | ||||
| use std::fs::File; | ||||
| use std::path::Path; | ||||
|  | ||||
| use actix_web::{get, post}; | ||||
| use actix_web::{HttpResponse, web}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
|  | ||||
| use crate::backup::{BackupInfo, BackupStatus, compressed_backup_folder, init_backup_process}; | ||||
| use crate::Data; | ||||
| use crate::error::{Error, ResponseError}; | ||||
| use crate::helpers::Authentication; | ||||
|  | ||||
| pub fn services(cfg: &mut web::ServiceConfig) { | ||||
|     cfg.service(trigger_backup) | ||||
|         .service(get_backup_status); | ||||
| } | ||||
|  | ||||
| #[post("/backups", wrap = "Authentication::Private")] | ||||
| async fn trigger_backup( | ||||
|     data: web::Data<Data>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     let backup_folder = Path::new(&data.backup_folder); | ||||
|     match init_backup_process(&data, &backup_folder) { | ||||
|         Ok(resume) => Ok(HttpResponse::Accepted().json(resume)), | ||||
|         Err(e) => Err(e.into()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| struct BackupStatusResponse { | ||||
|     status: String, | ||||
| } | ||||
|  | ||||
| #[derive(Deserialize)] | ||||
| struct BackupParam { | ||||
|     backup_uid: String, | ||||
| } | ||||
|  | ||||
| #[get("/backups/{backup_uid}/status", wrap = "Authentication::Private")] | ||||
| async fn get_backup_status( | ||||
|     data: web::Data<Data>, | ||||
|     path: web::Path<BackupParam>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     let backup_folder = Path::new(&data.backup_folder); | ||||
|     let backup_uid = &path.backup_uid; | ||||
|  | ||||
|     if let Some(resume) = BackupInfo::get_current() { | ||||
|         if &resume.uid == backup_uid { | ||||
|             return Ok(HttpResponse::Ok().json(resume)); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     if File::open(compressed_backup_folder(Path::new(backup_folder), backup_uid)).is_ok() { | ||||
|         let resume = BackupInfo::new( | ||||
|             backup_uid.into(), | ||||
|             BackupStatus::Done | ||||
|         ); | ||||
|  | ||||
|         Ok(HttpResponse::Ok().json(resume)) | ||||
|     } else { | ||||
|         Err(Error::not_found("backup does not exist").into()) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										64
									
								
								meilisearch-http/src/routes/dump.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										64
									
								
								meilisearch-http/src/routes/dump.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,64 @@ | ||||
| use std::fs::File; | ||||
| use std::path::Path; | ||||
|  | ||||
| use actix_web::{get, post}; | ||||
| use actix_web::{HttpResponse, web}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
|  | ||||
| use crate::dump::{DumpInfo, DumpStatus, compressed_dumps_folder, init_dump_process}; | ||||
| use crate::Data; | ||||
| use crate::error::{Error, ResponseError}; | ||||
| use crate::helpers::Authentication; | ||||
|  | ||||
| pub fn services(cfg: &mut web::ServiceConfig) { | ||||
|     cfg.service(trigger_dump) | ||||
|         .service(get_dump_status); | ||||
| } | ||||
|  | ||||
| #[post("/dumps", wrap = "Authentication::Private")] | ||||
| async fn trigger_dump( | ||||
|     data: web::Data<Data>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     let dumps_folder = Path::new(&data.dumps_folder); | ||||
|     match init_dump_process(&data, &dumps_folder) { | ||||
|         Ok(resume) => Ok(HttpResponse::Accepted().json(resume)), | ||||
|         Err(e) => Err(e.into()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Serialize)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| struct DumpStatusResponse { | ||||
|     status: String, | ||||
| } | ||||
|  | ||||
| #[derive(Deserialize)] | ||||
| struct DumpParam { | ||||
|     dump_uid: String, | ||||
| } | ||||
|  | ||||
| #[get("/dumps/{dump_uid}/status", wrap = "Authentication::Private")] | ||||
| async fn get_dump_status( | ||||
|     data: web::Data<Data>, | ||||
|     path: web::Path<DumpParam>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     let dumps_folder = Path::new(&data.dumps_folder); | ||||
|     let dump_uid = &path.dump_uid; | ||||
|  | ||||
|     if let Some(resume) = DumpInfo::get_current() { | ||||
|         if &resume.uid == dump_uid { | ||||
|             return Ok(HttpResponse::Ok().json(resume)); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     if File::open(compressed_dumps_folder(Path::new(dumps_folder), dump_uid)).is_ok() { | ||||
|         let resume = DumpInfo::new( | ||||
|             dump_uid.into(), | ||||
|             DumpStatus::Done | ||||
|         ); | ||||
|  | ||||
|         Ok(HttpResponse::Ok().json(resume)) | ||||
|     } else { | ||||
|         Err(Error::not_found("dump does not exist").into()) | ||||
|     } | ||||
| } | ||||
| @@ -10,7 +10,7 @@ pub mod setting; | ||||
| pub mod stats; | ||||
| pub mod stop_words; | ||||
| pub mod synonym; | ||||
| pub mod backup; | ||||
| pub mod dump; | ||||
|  | ||||
| #[derive(Deserialize)] | ||||
| pub struct IndexParam { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user