mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Merge #2445
2445: Seek-based tasks list r=Kerollmops a=Kerollmops This PR implements the seek-based pagination for the tasks list following [the spec](https://github.com/meilisearch/specifications/pull/115). Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
		| @@ -4,7 +4,7 @@ use crate::snapshot::SnapshotJob; | ||||
|  | ||||
| use super::task::{Task, TaskEvent}; | ||||
|  | ||||
| pub type BatchId = u64; | ||||
| pub type BatchId = u32; | ||||
|  | ||||
| #[derive(Debug)] | ||||
| pub enum BatchContent { | ||||
|   | ||||
| @@ -342,18 +342,10 @@ impl Scheduler { | ||||
|     } | ||||
|  | ||||
|     async fn fetch_pending_tasks(&mut self) -> Result<()> { | ||||
|         // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. | ||||
|         // | ||||
|         // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. | ||||
|         let mut filter = TaskFilter::default(); | ||||
|         filter.filter_fn(|task| !task.is_finished()); | ||||
|  | ||||
|         self.store | ||||
|             .list_tasks(Some(self.next_fetched_task_id), Some(filter), None) | ||||
|             .fetch_unfinished_tasks(Some(self.next_fetched_task_id)) | ||||
|             .await? | ||||
|             .into_iter() | ||||
|             // The tasks arrive in reverse order, and we need to insert them in order. | ||||
|             .rev() | ||||
|             .for_each(|t| { | ||||
|                 self.next_fetched_task_id = t.id + 1; | ||||
|                 self.register_task(t); | ||||
|   | ||||
| @@ -10,7 +10,7 @@ use crate::{ | ||||
|     index_resolver::IndexUid, | ||||
| }; | ||||
|  | ||||
| pub type TaskId = u64; | ||||
| pub type TaskId = u32; | ||||
|  | ||||
| #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] | ||||
| #[cfg_attr(test, derive(proptest_derive::Arbitrary))] | ||||
|   | ||||
| @@ -41,6 +41,10 @@ impl TaskFilter { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     fn filtered_indexes(&self) -> Option<&HashSet<String>> { | ||||
|         self.indexes.as_ref() | ||||
|     } | ||||
|  | ||||
|     /// Adds an index to the filter, so the filter must match this index. | ||||
|     pub fn filter_index(&mut self, index: String) { | ||||
|         self.indexes | ||||
| @@ -186,6 +190,17 @@ impl TaskStore { | ||||
|         Ok(tasks) | ||||
|     } | ||||
|  | ||||
|     pub async fn fetch_unfinished_tasks(&self, offset: Option<TaskId>) -> Result<Vec<Task>> { | ||||
|         let store = self.store.clone(); | ||||
|  | ||||
|         tokio::task::spawn_blocking(move || { | ||||
|             let txn = store.rtxn()?; | ||||
|             let tasks = store.fetch_unfinished_tasks(&txn, offset)?; | ||||
|             Ok(tasks) | ||||
|         }) | ||||
|         .await? | ||||
|     } | ||||
|  | ||||
|     pub async fn list_tasks( | ||||
|         &self, | ||||
|         offset: Option<TaskId>, | ||||
| @@ -325,6 +340,13 @@ pub mod test { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub async fn fetch_unfinished_tasks(&self, from: Option<TaskId>) -> Result<Vec<Task>> { | ||||
|             match self { | ||||
|                 Self::Real(s) => s.fetch_unfinished_tasks(from).await, | ||||
|                 Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) }, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub async fn list_tasks( | ||||
|             &self, | ||||
|             from: Option<TaskId>, | ||||
| @@ -378,7 +400,7 @@ pub mod test { | ||||
|  | ||||
|         let mut runner = TestRunner::new(Config::default()); | ||||
|         runner | ||||
|             .run(&(0..100u64).prop_map(gen_task), |task| { | ||||
|             .run(&(0..100u32).prop_map(gen_task), |task| { | ||||
|                 let mut txn = store.wtxn().unwrap(); | ||||
|                 let previous_id = store.next_task_id(&mut txn).unwrap(); | ||||
|  | ||||
|   | ||||
| @@ -1,62 +1,30 @@ | ||||
| #[allow(clippy::upper_case_acronyms)] | ||||
| type BEU64 = milli::heed::zerocopy::U64<milli::heed::byteorder::BE>; | ||||
|  | ||||
| const UID_TASK_IDS: &str = "uid_task_id"; | ||||
| type BEU32 = milli::heed::zerocopy::U32<milli::heed::byteorder::BE>; | ||||
|  | ||||
| const INDEX_UIDS_TASK_IDS: &str = "index-uids-task-ids"; | ||||
| const TASKS: &str = "tasks"; | ||||
|  | ||||
| use std::borrow::Cow; | ||||
| use std::collections::BinaryHeap; | ||||
| use std::convert::TryInto; | ||||
| use std::mem::size_of; | ||||
| use std::ops::Range; | ||||
| use std::collections::HashSet; | ||||
| use std::ops::Bound::{Excluded, Unbounded}; | ||||
| use std::result::Result as StdResult; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| use milli::heed::types::{ByteSlice, OwnedType, SerdeJson, Unit}; | ||||
| use milli::heed::{BytesDecode, BytesEncode, Database, Env, RoTxn, RwTxn}; | ||||
| use milli::heed::types::{OwnedType, SerdeJson, Str}; | ||||
| use milli::heed::{Database, Env, RoTxn, RwTxn}; | ||||
| use milli::heed_codec::RoaringBitmapCodec; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| use crate::tasks::task::{Task, TaskId}; | ||||
|  | ||||
| use super::super::Result; | ||||
|  | ||||
| use super::TaskFilter; | ||||
|  | ||||
| enum IndexUidTaskIdCodec {} | ||||
|  | ||||
| impl<'a> BytesEncode<'a> for IndexUidTaskIdCodec { | ||||
|     type EItem = (&'a str, TaskId); | ||||
|  | ||||
|     fn bytes_encode((s, id): &'a Self::EItem) -> Option<Cow<'a, [u8]>> { | ||||
|         let size = s.len() + std::mem::size_of::<TaskId>() + 1; | ||||
|         if size > 512 { | ||||
|             return None; | ||||
|         } | ||||
|         let mut b = Vec::with_capacity(size); | ||||
|         b.extend_from_slice(s.as_bytes()); | ||||
|         // null terminate the string | ||||
|         b.push(0); | ||||
|         b.extend_from_slice(&id.to_be_bytes()); | ||||
|         Some(Cow::Owned(b)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl<'a> BytesDecode<'a> for IndexUidTaskIdCodec { | ||||
|     type DItem = (&'a str, TaskId); | ||||
|  | ||||
|     fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> { | ||||
|         let len = bytes.len(); | ||||
|         let s_end = len.checked_sub(size_of::<TaskId>())?.checked_sub(1)?; | ||||
|         let str_bytes = &bytes[..s_end]; | ||||
|         let str = std::str::from_utf8(str_bytes).ok()?; | ||||
|         let id = TaskId::from_be_bytes(bytes[(len - size_of::<TaskId>())..].try_into().ok()?); | ||||
|         Some((str, id)) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub struct Store { | ||||
|     env: Arc<Env>, | ||||
|     uids_task_ids: Database<IndexUidTaskIdCodec, Unit>, | ||||
|     tasks: Database<OwnedType<BEU64>, SerdeJson<Task>>, | ||||
|     /// Maps an index uid to the set of tasks ids associated to it. | ||||
|     index_uid_task_ids: Database<Str, RoaringBitmapCodec>, | ||||
|     tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>, | ||||
| } | ||||
|  | ||||
| impl Drop for Store { | ||||
| @@ -74,12 +42,12 @@ impl Store { | ||||
|     /// You want to patch  all un-finished tasks and put them in your pending | ||||
|     /// queue with the `reset_and_return_unfinished_update` method. | ||||
|     pub fn new(env: Arc<milli::heed::Env>) -> Result<Self> { | ||||
|         let uids_task_ids = env.create_database(Some(UID_TASK_IDS))?; | ||||
|         let index_uid_task_ids = env.create_database(Some(INDEX_UIDS_TASK_IDS))?; | ||||
|         let tasks = env.create_database(Some(TASKS))?; | ||||
|  | ||||
|         Ok(Self { | ||||
|             env, | ||||
|             uids_task_ids, | ||||
|             index_uid_task_ids, | ||||
|             tasks, | ||||
|         }) | ||||
|     } | ||||
| @@ -107,121 +75,104 @@ impl Store { | ||||
|     } | ||||
|  | ||||
|     pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { | ||||
|         self.tasks.put(txn, &BEU64::new(task.id), task)?; | ||||
|         self.tasks.put(txn, &BEU32::new(task.id), task)?; | ||||
|         // only add the task to the indexes index if it has an index_uid | ||||
|         if let Some(ref index_uid) = task.index_uid { | ||||
|             self.uids_task_ids.put(txn, &(index_uid, task.id), &())?; | ||||
|         if let Some(index_uid) = &task.index_uid { | ||||
|             let mut tasks_set = self | ||||
|                 .index_uid_task_ids | ||||
|                 .get(txn, index_uid)? | ||||
|                 .unwrap_or_default(); | ||||
|  | ||||
|             tasks_set.insert(task.id); | ||||
|  | ||||
|             self.index_uid_task_ids.put(txn, index_uid, &tasks_set)?; | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn get(&self, txn: &RoTxn, id: TaskId) -> Result<Option<Task>> { | ||||
|         let task = self.tasks.get(txn, &BEU64::new(id))?; | ||||
|         let task = self.tasks.get(txn, &BEU32::new(id))?; | ||||
|         Ok(task) | ||||
|     } | ||||
|  | ||||
|     pub fn list_tasks<'a>( | ||||
|     /// Returns the unfinished tasks starting from the given taskId in ascending order. | ||||
|     pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option<TaskId>) -> Result<Vec<Task>> { | ||||
|         // We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file. | ||||
|         // | ||||
|         // TODO(marin): This may create some latency when the first batch lazy loads the pending updates. | ||||
|         let from = from.unwrap_or_default(); | ||||
|  | ||||
|         let result: StdResult<Vec<_>, milli::heed::Error> = self | ||||
|             .tasks | ||||
|             .range(txn, &(BEU32::new(from)..))? | ||||
|             .map(|r| r.map(|(_, t)| t)) | ||||
|             .filter(|result| result.as_ref().map_or(true, |t| !t.is_finished())) | ||||
|             .collect(); | ||||
|  | ||||
|         result.map_err(Into::into) | ||||
|     } | ||||
|  | ||||
|     /// Returns all the tasks starting from the given taskId and going in descending order. | ||||
|     pub fn list_tasks( | ||||
|         &self, | ||||
|         txn: &'a RoTxn, | ||||
|         txn: &RoTxn, | ||||
|         from: Option<TaskId>, | ||||
|         filter: Option<TaskFilter>, | ||||
|         limit: Option<usize>, | ||||
|     ) -> Result<Vec<Task>> { | ||||
|         let from = from.unwrap_or_default(); | ||||
|         let range = from..limit | ||||
|             .map(|limit| (limit as u64).saturating_add(from)) | ||||
|             .unwrap_or(u64::MAX); | ||||
|         let iter: Box<dyn Iterator<Item = StdResult<_, milli::heed::Error>>> = match filter { | ||||
|             Some( | ||||
|                 ref filter @ TaskFilter { | ||||
|                     indexes: Some(_), .. | ||||
|                 }, | ||||
|             ) => { | ||||
|                 let iter = self | ||||
|                     .compute_candidates(txn, filter, range)? | ||||
|                     .into_iter() | ||||
|                     .filter_map(|id| self.tasks.get(txn, &BEU64::new(id)).transpose()); | ||||
|  | ||||
|                 Box::new(iter) | ||||
|             } | ||||
|             _ => Box::new( | ||||
|                 self.tasks | ||||
|                     .rev_range(txn, &(BEU64::new(range.start)..BEU64::new(range.end)))? | ||||
|                     .map(|r| r.map(|(_, t)| t)), | ||||
|             ), | ||||
|         let from = match from { | ||||
|             Some(from) => from, | ||||
|             None => self.tasks.last(txn)?.map_or(0, |(id, _)| id.get()), | ||||
|         }; | ||||
|  | ||||
|         let apply_fitler = |task: &StdResult<_, milli::heed::Error>| match task { | ||||
|             Ok(ref t) => filter | ||||
|         let filter_fn = |task: &Task| { | ||||
|             filter | ||||
|                 .as_ref() | ||||
|                 .and_then(|filter| filter.filter_fn.as_ref()) | ||||
|                 .map(|f| f(t)) | ||||
|                 .unwrap_or(true), | ||||
|             Err(_) => true, | ||||
|                 .and_then(|f| f.filter_fn.as_ref()) | ||||
|                 .map_or(true, |f| f(task)) | ||||
|         }; | ||||
|         // Collect 'limit' task if it exists or all of them. | ||||
|         let tasks = iter | ||||
|             .filter(apply_fitler) | ||||
|             .take(limit.unwrap_or(usize::MAX)) | ||||
|             .try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| { | ||||
|                 v.push(task?); | ||||
|                 Ok(v) | ||||
|             })?; | ||||
|  | ||||
|         Ok(tasks) | ||||
|         let result: Result<Vec<_>> = match filter.as_ref().and_then(|f| f.filtered_indexes()) { | ||||
|             Some(indexes) => self | ||||
|                 .compute_candidates(txn, indexes, from)? | ||||
|                 .filter(|result| result.as_ref().map_or(true, filter_fn)) | ||||
|                 .take(limit.unwrap_or(usize::MAX)) | ||||
|                 .collect(), | ||||
|             None => self | ||||
|                 .tasks | ||||
|                 .rev_range(txn, &(..=BEU32::new(from)))? | ||||
|                 .map(|r| r.map(|(_, t)| t).map_err(Into::into)) | ||||
|                 .filter(|result| result.as_ref().map_or(true, filter_fn)) | ||||
|                 .take(limit.unwrap_or(usize::MAX)) | ||||
|                 .collect(), | ||||
|         }; | ||||
|  | ||||
|         result.map_err(Into::into) | ||||
|     } | ||||
|  | ||||
|     fn compute_candidates( | ||||
|         &self, | ||||
|         txn: &milli::heed::RoTxn, | ||||
|         filter: &TaskFilter, | ||||
|         range: Range<TaskId>, | ||||
|     ) -> Result<BinaryHeap<TaskId>> { | ||||
|         let mut candidates = BinaryHeap::new(); | ||||
|         if let Some(ref indexes) = filter.indexes { | ||||
|             for index in indexes { | ||||
|                 // We need to prefix search the null terminated string to make sure that we only | ||||
|                 // get exact matches for the index, and not other uids that would share the same | ||||
|                 // prefix, i.e test and test1. | ||||
|                 let mut index_uid = index.as_bytes().to_vec(); | ||||
|                 index_uid.push(0); | ||||
|     fn compute_candidates<'a>( | ||||
|         &'a self, | ||||
|         txn: &'a RoTxn, | ||||
|         indexes: &HashSet<String>, | ||||
|         from: TaskId, | ||||
|     ) -> Result<impl Iterator<Item = Result<Task>> + 'a> { | ||||
|         let mut candidates = RoaringBitmap::new(); | ||||
|  | ||||
|                 self.uids_task_ids | ||||
|                     .remap_key_type::<ByteSlice>() | ||||
|                     .rev_prefix_iter(txn, &index_uid)? | ||||
|                     .map(|entry| -> StdResult<_, milli::heed::Error> { | ||||
|                         let (key, _) = entry?; | ||||
|                         let (_, id) = IndexUidTaskIdCodec::bytes_decode(key) | ||||
|                             .ok_or(milli::heed::Error::Decoding)?; | ||||
|                         Ok(id) | ||||
|                     }) | ||||
|                     .skip_while(|entry| { | ||||
|                         entry | ||||
|                             .as_ref() | ||||
|                             .ok() | ||||
|                             // we skip all elements till we enter in the range | ||||
|                             .map(|key| !range.contains(key)) | ||||
|                             // if we encounter an error we returns true to collect it later | ||||
|                             .unwrap_or(true) | ||||
|                     }) | ||||
|                     .take_while(|entry| { | ||||
|                         entry | ||||
|                             .as_ref() | ||||
|                             .ok() | ||||
|                             // as soon as we are out of the range we exit | ||||
|                             .map(|key| range.contains(key)) | ||||
|                             // if we encounter an error we returns true to collect it later | ||||
|                             .unwrap_or(true) | ||||
|                     }) | ||||
|                     .try_for_each::<_, StdResult<(), milli::heed::Error>>(|id| { | ||||
|                         candidates.push(id?); | ||||
|                         Ok(()) | ||||
|                     })?; | ||||
|         for index_uid in indexes { | ||||
|             if let Some(tasks_set) = self.index_uid_task_ids.get(txn, index_uid)? { | ||||
|                 candidates |= tasks_set; | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         Ok(candidates) | ||||
|         candidates.remove_range((Excluded(from), Unbounded)); | ||||
|  | ||||
|         let iter = candidates | ||||
|             .into_iter() | ||||
|             .rev() | ||||
|             .filter_map(|id| self.get(txn, id).transpose()); | ||||
|  | ||||
|         Ok(iter) | ||||
|     } | ||||
| } | ||||
|  | ||||
| @@ -230,8 +181,6 @@ pub mod test { | ||||
|     use itertools::Itertools; | ||||
|     use milli::heed::EnvOpenOptions; | ||||
|     use nelson::Mocker; | ||||
|     use proptest::collection::vec; | ||||
|     use proptest::prelude::*; | ||||
|     use tempfile::TempDir; | ||||
|  | ||||
|     use crate::index_resolver::IndexUid; | ||||
| @@ -305,9 +254,20 @@ pub mod test { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn list_tasks<'a>( | ||||
|         pub fn fetch_unfinished_tasks( | ||||
|             &self, | ||||
|             txn: &'a RoTxn, | ||||
|             txn: &RoTxn, | ||||
|             from: Option<TaskId>, | ||||
|         ) -> Result<Vec<Task>> { | ||||
|             match self { | ||||
|                 MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from), | ||||
|                 MockStore::Fake(_) => todo!(), | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         pub fn list_tasks( | ||||
|             &self, | ||||
|             txn: &RoTxn, | ||||
|             from: Option<TaskId>, | ||||
|             filter: Option<TaskFilter>, | ||||
|             limit: Option<usize>, | ||||
| @@ -429,26 +389,4 @@ pub mod test { | ||||
|             "test" | ||||
|         ); | ||||
|     } | ||||
|  | ||||
|     proptest! { | ||||
|         #[test] | ||||
|         fn encode_decode_roundtrip(index_uid in any::<IndexUid>(), task_id in 0..TaskId::MAX) { | ||||
|             let value = (index_uid.as_ref(), task_id); | ||||
|             let bytes = IndexUidTaskIdCodec::bytes_encode(&value).unwrap(); | ||||
|             let (index, id) = IndexUidTaskIdCodec::bytes_decode(bytes.as_ref()).unwrap(); | ||||
|             assert_eq!(&*index_uid, index); | ||||
|             assert_eq!(task_id, id); | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         fn encode_doesnt_crash(index_uid in "\\PC*", task_id in 0..TaskId::MAX) { | ||||
|             let value = (index_uid.as_ref(), task_id); | ||||
|             IndexUidTaskIdCodec::bytes_encode(&value); | ||||
|         } | ||||
|  | ||||
|         #[test] | ||||
|         fn decode_doesnt_crash(bytes in vec(any::<u8>(), 0..1000)) { | ||||
|             IndexUidTaskIdCodec::bytes_decode(&bytes); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user