mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 21:16:28 +00:00 
			
		
		
		
	snapshot indexes
This commit is contained in:
		| @@ -8,7 +8,7 @@ use async_stream::stream; | ||||
| use chrono::{DateTime, Utc}; | ||||
| use futures::pin_mut; | ||||
| use futures::stream::StreamExt; | ||||
| use heed::EnvOpenOptions; | ||||
| use heed::{CompactionOption, EnvOpenOptions}; | ||||
| use log::debug; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use thiserror::Error; | ||||
| @@ -103,6 +103,11 @@ enum IndexMsg { | ||||
|         index_settings: IndexSettings, | ||||
|         ret: oneshot::Sender<Result<IndexMeta>>, | ||||
|     }, | ||||
|     Snapshot { | ||||
|         uuids: Vec<Uuid>, | ||||
|         path: PathBuf, | ||||
|         ret: oneshot::Sender<Result<()>>, | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct IndexActor<S> { | ||||
| @@ -251,6 +256,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> { | ||||
|             } => { | ||||
|                 let _ = ret.send(self.handle_update_index(uuid, index_settings).await); | ||||
|             } | ||||
|             Snapshot { uuids, path, ret } => { | ||||
|                 let _ = ret.send(self.handle_snapshot(uuids, path).await); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -403,6 +411,39 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> { | ||||
|         .await | ||||
|         .map_err(|e| IndexError::Error(e.into()))? | ||||
|     } | ||||
|  | ||||
|     async fn handle_snapshot(&self, uuids: Vec<Uuid>, mut path: PathBuf) -> Result<()> { | ||||
|         use tokio::fs::create_dir_all; | ||||
|  | ||||
|         path.push("indexes"); | ||||
|         println!("performing index snapshot in {:?}", path); | ||||
|         create_dir_all(&path) | ||||
|             .await | ||||
|             .map_err(|e| IndexError::Error(e.into()))?; | ||||
|  | ||||
|         let mut handles = Vec::new(); | ||||
|         for uuid in uuids { | ||||
|             if let Some(index) = self.store.get(uuid).await? { | ||||
|                 let index_path = path.join(format!("index-{}", uuid)); | ||||
|                 let handle = spawn_blocking(move || -> anyhow::Result<()> { | ||||
|                     // Get write txn to wait for ongoing write transaction before snapshot. | ||||
|                     let _txn = index.write_txn()?; | ||||
|                     index.env.copy_to_path(index_path, CompactionOption::Enabled)?; | ||||
|                     Ok(()) | ||||
|                 }); | ||||
|                 handles.push(handle); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         for handle in handles { | ||||
|             handle | ||||
|                 .await | ||||
|                 .map_err(|e| IndexError::Error(e.into()))? | ||||
|                 .map_err(|e| IndexError::Error(e.into()))?; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| @@ -525,6 +566,17 @@ impl IndexActorHandle { | ||||
|         let _ = self.read_sender.send(msg).await; | ||||
|         Ok(receiver.await.expect("IndexActor has been killed")?) | ||||
|     } | ||||
|  | ||||
|     pub async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = IndexMsg::Snapshot { | ||||
|             uuids, | ||||
|             path, | ||||
|             ret, | ||||
|         }; | ||||
|         let _ = self.read_sender.send(msg).await; | ||||
|         Ok(receiver.await.expect("IndexActor has been killed")?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct HeedIndexStore { | ||||
|   | ||||
| @@ -1,7 +1,9 @@ | ||||
| use std::path::PathBuf; | ||||
| use std::time::Duration; | ||||
| use std::fs::create_dir_all; | ||||
|  | ||||
| use tokio::time::interval; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use super::index_actor::IndexActorHandle; | ||||
| use super::update_actor::UpdateActorHandle; | ||||
| @@ -38,11 +40,20 @@ impl<B> SnapshotService<B> { | ||||
|  | ||||
|         loop { | ||||
|             interval.tick().await; | ||||
|             self.perform_snapshot().await; | ||||
|             self.perform_snapshot().await.unwrap(); | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     async fn perform_snapshot(&self) { | ||||
|         println!("performing snapshot in {:?}", self.snapshot_path); | ||||
|     async fn perform_snapshot(&self) -> anyhow::Result<()> { | ||||
|         let temp_snapshot_path = self | ||||
|             .snapshot_path | ||||
|             .join(format!("tmp-{}", Uuid::new_v4())); | ||||
|         create_dir_all(&temp_snapshot_path)?; | ||||
|         let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; | ||||
|         let index_snapshot = self.index_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); | ||||
|         let updates_snapshot = self.update_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); | ||||
|         let (first, second) = tokio::join!(updates_snapshot, index_snapshot); | ||||
|         println!("results: {:?}, {:?}", first, second); | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -55,6 +55,11 @@ enum UpdateMsg<D> { | ||||
|         uuid: Uuid, | ||||
|         ret: oneshot::Sender<Result<()>>, | ||||
|     }, | ||||
|     Snapshot { | ||||
|         uuids: Vec<Uuid>, | ||||
|         path: PathBuf, | ||||
|         ret: oneshot::Sender<Result<()>>, | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct UpdateActor<D, S> { | ||||
| @@ -113,6 +118,9 @@ where | ||||
|                 Some(Create { uuid, ret }) => { | ||||
|                     let _ = ret.send(self.handle_create(uuid).await); | ||||
|                 } | ||||
|                 Some(Snapshot { uuids, path, ret }) => { | ||||
|                     let _ = ret.send(self.handle_snapshot(uuids, path).await); | ||||
|                 } | ||||
|                 None => break, | ||||
|             } | ||||
|         } | ||||
| @@ -232,6 +240,16 @@ where | ||||
|         let _ = self.store.get_or_create(uuid).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     async fn handle_snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { | ||||
|         use tokio::time; | ||||
|         use std::time::Duration; | ||||
|  | ||||
|         println!("performing update snapshot"); | ||||
|         time::sleep(Duration::from_secs(2)).await; | ||||
|         println!("Update snapshot done"); | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Clone)] | ||||
| @@ -274,7 +292,9 @@ where | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         receiver.await.expect("update actor killed.") | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<D> UpdateActorHandle<D> { | ||||
|     pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UpdateMsg::ListUpdates { uuid, ret }; | ||||
| @@ -302,6 +322,13 @@ where | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         receiver.await.expect("update actor killed.") | ||||
|     } | ||||
|  | ||||
|     pub async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UpdateMsg::Snapshot { uuids, path, ret }; | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         receiver.await.expect("update actor killed.") | ||||
|     } | ||||
| } | ||||
|  | ||||
| struct MapUpdateStoreStore { | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| use std::{fs::create_dir_all, path::Path}; | ||||
| use std::fs::create_dir_all; | ||||
| use std::path::{Path, PathBuf}; | ||||
|  | ||||
| use heed::{ | ||||
|     types::{ByteSlice, Str}, | ||||
| @@ -8,6 +9,7 @@ use log::{info, warn}; | ||||
| use thiserror::Error; | ||||
| use tokio::sync::{mpsc, oneshot}; | ||||
| use uuid::Uuid; | ||||
| use heed::CompactionOption; | ||||
|  | ||||
| pub type Result<T> = std::result::Result<T, UuidError>; | ||||
|  | ||||
| @@ -32,6 +34,10 @@ enum UuidResolveMsg { | ||||
|     List { | ||||
|         ret: oneshot::Sender<Result<Vec<(String, Uuid)>>>, | ||||
|     }, | ||||
|     SnapshotRequest { | ||||
|         path: PathBuf, | ||||
|         ret: oneshot::Sender<Result<Vec<Uuid>>>, | ||||
|     }, | ||||
| } | ||||
|  | ||||
| struct UuidResolverActor<S> { | ||||
| @@ -66,6 +72,9 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|                 Some(List { ret }) => { | ||||
|                     let _ = ret.send(self.handle_list().await); | ||||
|                 } | ||||
|                 Some(SnapshotRequest { path, ret }) => { | ||||
|                     let _ = ret.send(self.handle_snapshot(path).await); | ||||
|                 } | ||||
|                 // all senders have been dropped, need to quit. | ||||
|                 None => break, | ||||
|             } | ||||
| @@ -106,6 +115,10 @@ impl<S: UuidStore> UuidResolverActor<S> { | ||||
|         let result = self.store.list().await?; | ||||
|         Ok(result) | ||||
|     } | ||||
|  | ||||
|     async fn handle_snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> { | ||||
|         self.store.snapshot(path).await | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn is_index_uid_valid(uid: &str) -> bool { | ||||
| @@ -171,6 +184,15 @@ impl UuidResolverHandle { | ||||
|             .await | ||||
|             .expect("Uuid resolver actor has been killed")?) | ||||
|     } | ||||
|  | ||||
|     pub async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = UuidResolveMsg::SnapshotRequest { path, ret }; | ||||
|         let _ = self.sender.send(msg).await; | ||||
|         Ok(receiver | ||||
|             .await | ||||
|             .expect("Uuid resolver actor has been killed")?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Error)] | ||||
| @@ -197,6 +219,7 @@ trait UuidStore { | ||||
|     async fn get_uuid(&self, uid: String) -> Result<Option<Uuid>>; | ||||
|     async fn delete(&self, uid: String) -> Result<Option<Uuid>>; | ||||
|     async fn list(&self) -> Result<Vec<(String, Uuid)>>; | ||||
|     async fn snapshot(&self, path: PathBuf) -> Result<Vec<Uuid>>; | ||||
| } | ||||
|  | ||||
| struct HeedUuidStore { | ||||
| @@ -242,7 +265,6 @@ impl UuidStore for HeedUuidStore { | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
|  | ||||
|     async fn get_uuid(&self, name: String) -> Result<Option<Uuid>> { | ||||
|         let env = self.env.clone(); | ||||
|         let db = self.db; | ||||
| @@ -292,4 +314,23 @@ impl UuidStore for HeedUuidStore { | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
|  | ||||
|     async fn snapshot(&self, mut path: PathBuf) -> Result<Vec<Uuid>> { | ||||
|         let env = self.env.clone(); | ||||
|         let db = self.db; | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             // Write transaction to acquire a lock on the database. | ||||
|             let txn = env.write_txn()?; | ||||
|             let mut entries = Vec::new(); | ||||
|             for entry in db.iter(&txn)? { | ||||
|                 let (_, uuid) = entry?; | ||||
|                 let uuid = Uuid::from_slice(uuid)?; | ||||
|                 entries.push(uuid) | ||||
|             } | ||||
|             path.push("uuids"); | ||||
|             env.copy_to_path(path, CompactionOption::Enabled)?; | ||||
|             Ok(entries) | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user