mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-24 20:46:27 +00:00 
			
		
		
		
	Merge pull request #4969 from meilisearch/indexer-edition-2024-try-map
Indexer edition 2024 try map
This commit is contained in:
		
							
								
								
									
										327
									
								
								milli/src/update/new/append_only_vec.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										327
									
								
								milli/src/update/new/append_only_vec.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,327 @@ | |||||||
|  | // Code taken from <https://github.com/droundy/append-only-vec/blob/main/src/lib.rs> | ||||||
|  | // and modified in order to get a ref mut instead of the index of newly inserted items. | ||||||
|  |  | ||||||
|  | //! AppendOnlyVec | ||||||
|  | //! | ||||||
|  | //! This is a pretty simple type, which is a vector that you can push into and | ||||||
|  | //! receive a reference to the item you just inserted. The data structure never | ||||||
|  | //! moves an element once allocated, so you can push to the vec even while holding | ||||||
|  | //! mutable references to elements that have already been pushed. | ||||||
|  | //! | ||||||
|  | //! ### Scaling | ||||||
|  | //! | ||||||
|  | //! 1. Accessing an element is O(1), but slightly more expensive than for a | ||||||
|  | //!    standard `Vec`. | ||||||
|  | //! | ||||||
|  | //! 2. Pushing a new element amortizes to O(1), but may require allocation of a | ||||||
|  | //!    new chunk. | ||||||
|  | //! | ||||||
|  | //! ### Example | ||||||
|  | //! | ||||||
|  | //! ``` | ||||||
|  | //! use append_only_vec::AppendOnlyVec; | ||||||
|  | //! | ||||||
|  | //! static V: AppendOnlyVec<String> = AppendOnlyVec::<String>::new(); | ||||||
|  | //! let mut threads = Vec::new(); | ||||||
|  | //! for thread_num in 0..10 { | ||||||
|  | //!     threads.push(std::thread::spawn(move || { | ||||||
|  | //!          for n in 0..100 { | ||||||
|  | //!               let s = format!("thread {} says {}", thread_num, n); | ||||||
|  | //!               let which = V.push(s.clone()); | ||||||
|  | //!               assert_eq!(&which, &s); | ||||||
|  | //!          } | ||||||
|  | //!     })); | ||||||
|  | //! } | ||||||
|  | //! | ||||||
|  | //! for t in threads { | ||||||
|  | //!    t.join(); | ||||||
|  | //! } | ||||||
|  | //! | ||||||
|  | //! assert_eq!(V.len(), 1000); | ||||||
|  | //! ``` | ||||||
|  |  | ||||||
|  | use std::cell::UnsafeCell; | ||||||
|  | use std::fmt::Debug; | ||||||
|  | use std::ptr; | ||||||
|  | use std::sync::atomic::{AtomicUsize, Ordering}; | ||||||
|  |  | ||||||
|  | pub struct AppendOnlyVec<T> { | ||||||
|  |     count: AtomicUsize, | ||||||
|  |     _reserved: AtomicUsize, | ||||||
|  |     data: [UnsafeCell<*mut T>; BITS_USED - 1 - 3], | ||||||
|  | } | ||||||
|  |  | ||||||
|  | unsafe impl<T: Send> Send for AppendOnlyVec<T> {} | ||||||
|  | unsafe impl<T: Sync + Send> Sync for AppendOnlyVec<T> {} | ||||||
|  |  | ||||||
|  | const BITS: usize = std::mem::size_of::<usize>() * 8; | ||||||
|  |  | ||||||
|  | #[cfg(target_arch = "x86_64")] | ||||||
|  | const BITS_USED: usize = 48; | ||||||
|  | #[cfg(all(not(target_arch = "x86_64"), target_pointer_width = "64"))] | ||||||
|  | const BITS_USED: usize = 64; | ||||||
|  | #[cfg(target_pointer_width = "32")] | ||||||
|  | const BITS_USED: usize = 32; | ||||||
|  |  | ||||||
|  | // This takes an index into a vec, and determines which data array will hold it | ||||||
|  | // (the first return value), and what the index will be into that data array | ||||||
|  | // (second return value) | ||||||
|  | // | ||||||
|  | // The ith data array holds 1<<i values. | ||||||
|  | const fn indices(i: usize) -> (u32, usize) { | ||||||
|  |     let i = i + 8; | ||||||
|  |     let bin = BITS as u32 - 1 - i.leading_zeros(); | ||||||
|  |     let bin = bin - 3; | ||||||
|  |     let offset = i - bin_size(bin); | ||||||
|  |     (bin, offset) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | const fn bin_size(array: u32) -> usize { | ||||||
|  |     (1 << 3) << array | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn test_indices() { | ||||||
|  |     for i in 0..32 { | ||||||
|  |         println!("{:3}: {} {}", i, indices(i).0, indices(i).1); | ||||||
|  |     } | ||||||
|  |     let mut array = 0; | ||||||
|  |     let mut offset = 0; | ||||||
|  |     let mut index = 0; | ||||||
|  |     while index < 1000 { | ||||||
|  |         index += 1; | ||||||
|  |         offset += 1; | ||||||
|  |         if offset >= bin_size(array) { | ||||||
|  |             offset = 0; | ||||||
|  |             array += 1; | ||||||
|  |         } | ||||||
|  |         assert_eq!(indices(index), (array, offset)); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> AppendOnlyVec<T> { | ||||||
|  |     const EMPTY: UnsafeCell<*mut T> = UnsafeCell::new(ptr::null_mut()); | ||||||
|  |  | ||||||
|  |     /// Allocate a new empty array. | ||||||
|  |     pub const fn new() -> Self { | ||||||
|  |         AppendOnlyVec { | ||||||
|  |             count: AtomicUsize::new(0), | ||||||
|  |             _reserved: AtomicUsize::new(0), | ||||||
|  |             data: [Self::EMPTY; BITS_USED - 1 - 3], | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Find the length of the array. | ||||||
|  |     #[inline] | ||||||
|  |     pub fn len(&self) -> usize { | ||||||
|  |         self.count.load(Ordering::Acquire) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn layout(array: u32) -> std::alloc::Layout { | ||||||
|  |         std::alloc::Layout::array::<T>(bin_size(array)).unwrap() | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Append an element to the array and get a mutable ref to it. | ||||||
|  |     /// | ||||||
|  |     /// This is notable in that it doesn't require a `&mut self`, because it | ||||||
|  |     /// does appropriate atomic synchronization. | ||||||
|  |     pub fn push(&self, val: T) -> &mut T { | ||||||
|  |         let idx = self._reserved.fetch_add(1, Ordering::Relaxed); | ||||||
|  |         let (array, offset) = indices(idx); | ||||||
|  |         let ptr = if self.len() < 1 + idx - offset { | ||||||
|  |             // We are working on a new array, which may not have been allocated... | ||||||
|  |             if offset == 0 { | ||||||
|  |                 // It is our job to allocate the array! The size of the array | ||||||
|  |                 // is determined in the self.layout method, which needs to be | ||||||
|  |                 // consistent with the indices function. | ||||||
|  |                 let layout = Self::layout(array); | ||||||
|  |                 let ptr = unsafe { std::alloc::alloc(layout) } as *mut T; | ||||||
|  |                 unsafe { | ||||||
|  |                     *self.data[array as usize].get() = ptr; | ||||||
|  |                 } | ||||||
|  |                 ptr | ||||||
|  |             } else { | ||||||
|  |                 // We need to wait for the array to be allocated. | ||||||
|  |                 while self.len() < 1 + idx - offset { | ||||||
|  |                     std::hint::spin_loop(); | ||||||
|  |                 } | ||||||
|  |                 // The Ordering::Acquire semantics of self.len() ensures that | ||||||
|  |                 // this pointer read will get the non-null pointer allocated | ||||||
|  |                 // above. | ||||||
|  |                 unsafe { *self.data[array as usize].get() } | ||||||
|  |             } | ||||||
|  |         } else { | ||||||
|  |             // The Ordering::Acquire semantics of self.len() ensures that | ||||||
|  |             // this pointer read will get the non-null pointer allocated | ||||||
|  |             // above. | ||||||
|  |             unsafe { *self.data[array as usize].get() } | ||||||
|  |         }; | ||||||
|  |  | ||||||
|  |         // The contents of this offset are guaranteed to be unused (so far) | ||||||
|  |         // because we got the idx from our fetch_add above, and ptr is | ||||||
|  |         // guaranteed to be valid because of the loop we used above, which used | ||||||
|  |         // self.len() which has Ordering::Acquire semantics. | ||||||
|  |         unsafe { (ptr.add(offset)).write(val) }; | ||||||
|  |  | ||||||
|  |         // Now we need to increase the size of the vec, so it can get read. We | ||||||
|  |         // use Release upon success, to ensure that the value which we wrote is | ||||||
|  |         // visible to any thread that has confirmed that the count is big enough | ||||||
|  |         // to read that element. In case of failure, we can be relaxed, since | ||||||
|  |         // we don't do anything with the result other than try again. | ||||||
|  |         while self | ||||||
|  |             .count | ||||||
|  |             .compare_exchange(idx, idx + 1, Ordering::Release, Ordering::Relaxed) | ||||||
|  |             .is_err() | ||||||
|  |         { | ||||||
|  |             // This means that someone else *started* pushing before we started, | ||||||
|  |             // but hasn't yet finished. We have to wait for them to finish | ||||||
|  |             // pushing before we can update the count. Note that using a | ||||||
|  |             // spinloop here isn't really ideal, but except when allocating a | ||||||
|  |             // new array, the window between reserving space and using it is | ||||||
|  |             // pretty small, so contention will hopefully be rare, and having a | ||||||
|  |             // context switch during that interval will hopefully be vanishingly | ||||||
|  |             // unlikely. | ||||||
|  |             std::hint::spin_loop(); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         unsafe { &mut *ptr } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// Convert into a standard `Vec`. | ||||||
|  |     pub fn into_vec(self) -> Vec<T> { | ||||||
|  |         let mut vec = Vec::with_capacity(self.len()); | ||||||
|  |  | ||||||
|  |         for idx in 0..self.len() { | ||||||
|  |             let (array, offset) = indices(idx); | ||||||
|  |             // We use a Relaxed load of the pointer, because the loop above (which | ||||||
|  |             // ends before `self.len()`) should ensure that the data we want is | ||||||
|  |             // already visible, since it Acquired `self.count` which synchronizes | ||||||
|  |             // with the write in `self.push`. | ||||||
|  |             let ptr = unsafe { *self.data[array as usize].get() }; | ||||||
|  |  | ||||||
|  |             // Copy the element value. The copy remaining in the array must not | ||||||
|  |             // be used again (i.e. make sure we do not drop it) | ||||||
|  |             let value = unsafe { ptr.add(offset).read() }; | ||||||
|  |  | ||||||
|  |             vec.push(value); | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         // Prevent dropping the copied-out values by marking the count as 0 before | ||||||
|  |         // our own drop is run | ||||||
|  |         self.count.store(0, Ordering::Relaxed); | ||||||
|  |  | ||||||
|  |         vec | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> Default for AppendOnlyVec<T> { | ||||||
|  |     fn default() -> Self { | ||||||
|  |         Self::new() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> Debug for AppendOnlyVec<T> { | ||||||
|  |     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||||
|  |         f.debug_struct("AppendOnlyVec").field("len", &self.len()).finish() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> Drop for AppendOnlyVec<T> { | ||||||
|  |     fn drop(&mut self) { | ||||||
|  |         // First we'll drop all the `T` in a slightly sloppy way.  FIXME this | ||||||
|  |         // could be optimized to avoid reloading the `ptr`. | ||||||
|  |         for idx in 0..self.len() { | ||||||
|  |             let (array, offset) = indices(idx); | ||||||
|  |             // We use a Relaxed load of the pointer, because the loop above (which | ||||||
|  |             // ends before `self.len()`) should ensure that the data we want is | ||||||
|  |             // already visible, since it Acquired `self.count` which synchronizes | ||||||
|  |             // with the write in `self.push`. | ||||||
|  |             let ptr = unsafe { *self.data[array as usize].get() }; | ||||||
|  |             unsafe { | ||||||
|  |                 ptr::drop_in_place(ptr.add(offset)); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         // Now we will free all the arrays. | ||||||
|  |         for array in 0..self.data.len() as u32 { | ||||||
|  |             // This load is relaxed because no other thread can have a reference | ||||||
|  |             // to Self because we have a &mut self. | ||||||
|  |             let ptr = unsafe { *self.data[array as usize].get() }; | ||||||
|  |             if !ptr.is_null() { | ||||||
|  |                 let layout = Self::layout(array); | ||||||
|  |                 unsafe { std::alloc::dealloc(ptr as *mut u8, layout) }; | ||||||
|  |             } else { | ||||||
|  |                 break; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T> IntoIterator for AppendOnlyVec<T> { | ||||||
|  |     type Item = T; | ||||||
|  |     type IntoIter = std::vec::IntoIter<T>; | ||||||
|  |  | ||||||
|  |     fn into_iter(self) -> Self::IntoIter { | ||||||
|  |         self.into_vec().into_iter() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn test_parallel_pushing() { | ||||||
|  |     use std::sync::Arc; | ||||||
|  |     let v = Arc::new(AppendOnlyVec::<u64>::new()); | ||||||
|  |     let mut threads = Vec::new(); | ||||||
|  |     const N: u64 = 100; | ||||||
|  |     for thread_num in 0..N { | ||||||
|  |         let v = v.clone(); | ||||||
|  |         threads.push(std::thread::spawn(move || { | ||||||
|  |             let which1 = v.push(thread_num); | ||||||
|  |             let which2 = v.push(thread_num); | ||||||
|  |             assert_eq!(*which1, thread_num); | ||||||
|  |             assert_eq!(*which2, thread_num); | ||||||
|  |         })); | ||||||
|  |     } | ||||||
|  |     for t in threads { | ||||||
|  |         t.join().unwrap(); | ||||||
|  |     } | ||||||
|  |     let v = Arc::into_inner(v).unwrap().into_vec(); | ||||||
|  |     for thread_num in 0..N { | ||||||
|  |         assert_eq!(2, v.iter().copied().filter(|&x| x == thread_num).count()); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn test_into_vec() { | ||||||
|  |     struct SafeToDrop(bool); | ||||||
|  |  | ||||||
|  |     impl Drop for SafeToDrop { | ||||||
|  |         fn drop(&mut self) { | ||||||
|  |             assert!(self.0); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let v = AppendOnlyVec::new(); | ||||||
|  |  | ||||||
|  |     for _ in 0..50 { | ||||||
|  |         v.push(SafeToDrop(false)); | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let mut v = v.into_vec(); | ||||||
|  |     assert_eq!(v.len(), 50); | ||||||
|  |  | ||||||
|  |     for i in v.iter_mut() { | ||||||
|  |         i.0 = true; | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[test] | ||||||
|  | fn test_push_then_index_mut() { | ||||||
|  |     let v = AppendOnlyVec::<usize>::new(); | ||||||
|  |     for i in 0..1024 { | ||||||
|  |         *v.push(i) += 1; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     let v = v.into_vec(); | ||||||
|  |     for i in 0..1024 { | ||||||
|  |         assert_eq!(v[i], 2 * i); | ||||||
|  |     } | ||||||
|  | } | ||||||
| @@ -1,6 +1,7 @@ | |||||||
| use std::collections::HashSet; | use std::collections::HashSet; | ||||||
| use std::fmt::Debug; | use std::fmt::Debug; | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| use grenad::{MergeFunction, Merger}; | use grenad::{MergeFunction, Merger}; | ||||||
| use heed::RoTxn; | use heed::RoTxn; | ||||||
| @@ -11,10 +12,14 @@ use super::super::cache::CboCachedSorter; | |||||||
| use super::facet_document::extract_document_facets; | use super::facet_document::extract_document_facets; | ||||||
| use super::FacetKind; | use super::FacetKind; | ||||||
| use crate::facet::value_encoding::f64_into_bytes; | use crate::facet::value_encoding::f64_into_bytes; | ||||||
|  | use crate::update::new::append_only_vec::AppendOnlyVec; | ||||||
| use crate::update::new::extract::DocidsExtractor; | use crate::update::new::extract::DocidsExtractor; | ||||||
| use crate::update::new::{DocumentChange, ItemsPool}; | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; | ||||||
|  | use crate::update::new::DocumentChange; | ||||||
| use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | ||||||
| use crate::{DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH}; | use crate::{ | ||||||
|  |     DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, MAX_FACET_VALUE_LENGTH, | ||||||
|  | }; | ||||||
| pub struct FacetedDocidsExtractor; | pub struct FacetedDocidsExtractor; | ||||||
|  |  | ||||||
| impl FacetedDocidsExtractor { | impl FacetedDocidsExtractor { | ||||||
| @@ -195,7 +200,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { | |||||||
|         index: &Index, |         index: &Index, | ||||||
|         fields_ids_map: &GlobalFieldsIdsMap, |         fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|         indexer: GrenadParameters, |         indexer: GrenadParameters, | ||||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |         document_changes: impl IntoParallelIterator< | ||||||
|  |             Item = std::result::Result<DocumentChange, Arc<Error>>, | ||||||
|  |         >, | ||||||
|     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { |     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { | ||||||
|         let max_memory = indexer.max_memory_by_thread(); |         let max_memory = indexer.max_memory_by_thread(); | ||||||
|  |  | ||||||
| @@ -203,35 +210,32 @@ impl DocidsExtractor for FacetedDocidsExtractor { | |||||||
|         let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; |         let attributes_to_extract = Self::attributes_to_extract(&rtxn, index)?; | ||||||
|         let attributes_to_extract: Vec<_> = |         let attributes_to_extract: Vec<_> = | ||||||
|             attributes_to_extract.iter().map(|s| s.as_ref()).collect(); |             attributes_to_extract.iter().map(|s| s.as_ref()).collect(); | ||||||
|  |         let caches = AppendOnlyVec::new(); | ||||||
|         let context_pool = ItemsPool::new(|| { |  | ||||||
|             Ok(( |  | ||||||
|                 index.read_txn()?, |  | ||||||
|                 fields_ids_map.clone(), |  | ||||||
|                 Vec::new(), |  | ||||||
|                 CboCachedSorter::new( |  | ||||||
|                     // TODO use a better value |  | ||||||
|                     100.try_into().unwrap(), |  | ||||||
|                     create_sorter( |  | ||||||
|                         grenad::SortAlgorithm::Stable, |  | ||||||
|                         MergeDeladdCboRoaringBitmaps, |  | ||||||
|                         indexer.chunk_compression_type, |  | ||||||
|                         indexer.chunk_compression_level, |  | ||||||
|                         indexer.max_nb_chunks, |  | ||||||
|                         max_memory, |  | ||||||
|                     ), |  | ||||||
|                 ), |  | ||||||
|             )) |  | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         { |         { | ||||||
|             let span = |             let span = | ||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); |                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|             document_changes.into_par_iter().try_for_each(|document_change| { |             document_changes.into_par_iter().try_arc_for_each_try_init( | ||||||
|                 context_pool.with(|(rtxn, fields_ids_map, buffer, cached_sorter)| { |                 || { | ||||||
|  |                     let rtxn = index.read_txn().map_err(Error::from)?; | ||||||
|  |                     let cache = caches.push(CboCachedSorter::new( | ||||||
|  |                         // TODO use a better value | ||||||
|  |                         100.try_into().unwrap(), | ||||||
|  |                         create_sorter( | ||||||
|  |                             grenad::SortAlgorithm::Stable, | ||||||
|  |                             MergeDeladdCboRoaringBitmaps, | ||||||
|  |                             indexer.chunk_compression_type, | ||||||
|  |                             indexer.chunk_compression_level, | ||||||
|  |                             indexer.max_nb_chunks, | ||||||
|  |                             max_memory, | ||||||
|  |                         ), | ||||||
|  |                     )); | ||||||
|  |                     Ok((rtxn, fields_ids_map.clone(), Vec::new(), cache)) | ||||||
|  |                 }, | ||||||
|  |                 |(rtxn, fields_ids_map, buffer, cached_sorter), document_change| { | ||||||
|                     Self::extract_document_change( |                     Self::extract_document_change( | ||||||
|                         &*rtxn, |                         rtxn, | ||||||
|                         index, |                         index, | ||||||
|                         buffer, |                         buffer, | ||||||
|                         fields_ids_map, |                         fields_ids_map, | ||||||
| @@ -239,8 +243,9 @@ impl DocidsExtractor for FacetedDocidsExtractor { | |||||||
|                         cached_sorter, |                         cached_sorter, | ||||||
|                         document_change?, |                         document_change?, | ||||||
|                     ) |                     ) | ||||||
|                 }) |                     .map_err(Arc::new) | ||||||
|             })?; |                 }, | ||||||
|  |             )?; | ||||||
|         } |         } | ||||||
|         { |         { | ||||||
|             let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); |             let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); | ||||||
| @@ -248,14 +253,15 @@ impl DocidsExtractor for FacetedDocidsExtractor { | |||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); |                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|  |  | ||||||
|             let readers: Vec<_> = context_pool |             let readers: Vec<_> = caches | ||||||
|                 .into_items() |                 .into_iter() | ||||||
|                 .par_bridge() |                 .par_bridge() | ||||||
|                 .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { |                 .map(|cached_sorter| { | ||||||
|                     let sorter = cached_sorter.into_sorter()?; |                     let sorter = cached_sorter.into_sorter()?; | ||||||
|                     sorter.into_reader_cursors() |                     sorter.into_reader_cursors() | ||||||
|                 }) |                 }) | ||||||
|                 .collect(); |                 .collect(); | ||||||
|  |  | ||||||
|             for reader in readers { |             for reader in readers { | ||||||
|                 builder.extend(reader?); |                 builder.extend(reader?); | ||||||
|             } |             } | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ mod lru; | |||||||
| mod searchable; | mod searchable; | ||||||
|  |  | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| pub use faceted::*; | pub use faceted::*; | ||||||
| use grenad::Merger; | use grenad::Merger; | ||||||
| @@ -12,14 +13,16 @@ pub use searchable::*; | |||||||
|  |  | ||||||
| use super::DocumentChange; | use super::DocumentChange; | ||||||
| use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; | use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; | ||||||
| use crate::{GlobalFieldsIdsMap, Index, Result}; | use crate::{Error, GlobalFieldsIdsMap, Index, Result}; | ||||||
|  |  | ||||||
| pub trait DocidsExtractor { | pub trait DocidsExtractor { | ||||||
|     fn run_extraction( |     fn run_extraction( | ||||||
|         index: &Index, |         index: &Index, | ||||||
|         fields_ids_map: &GlobalFieldsIdsMap, |         fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|         indexer: GrenadParameters, |         indexer: GrenadParameters, | ||||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |         document_changes: impl IntoParallelIterator< | ||||||
|  |             Item = std::result::Result<DocumentChange, Arc<Error>>, | ||||||
|  |         >, | ||||||
|     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>; |     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>>; | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -1,18 +1,22 @@ | |||||||
| use std::collections::HashMap; | use std::collections::HashMap; | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
| use std::num::NonZero; | use std::num::NonZero; | ||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| use grenad::{Merger, MergerBuilder}; | use grenad::{Merger, MergerBuilder}; | ||||||
| use heed::RoTxn; | use heed::RoTxn; | ||||||
| use rayon::iter::{IntoParallelIterator, ParallelIterator}; | use rayon::iter::IntoParallelIterator; | ||||||
|  |  | ||||||
| use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; | use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; | ||||||
|  | use super::SearchableExtractor; | ||||||
|  | use crate::update::new::append_only_vec::AppendOnlyVec; | ||||||
| use crate::update::new::extract::cache::CboCachedSorter; | use crate::update::new::extract::cache::CboCachedSorter; | ||||||
| use crate::update::new::extract::perm_json_p::contained_in; | use crate::update::new::extract::perm_json_p::contained_in; | ||||||
| use crate::update::new::{DocumentChange, ItemsPool}; | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; | ||||||
|  | use crate::update::new::DocumentChange; | ||||||
| use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | ||||||
| use crate::{ | use crate::{ | ||||||
|     bucketed_position, DocumentId, FieldId, GlobalFieldsIdsMap, Index, Result, |     bucketed_position, DocumentId, Error, FieldId, GlobalFieldsIdsMap, Index, Result, | ||||||
|     MAX_POSITION_PER_ATTRIBUTE, |     MAX_POSITION_PER_ATTRIBUTE, | ||||||
| }; | }; | ||||||
|  |  | ||||||
| @@ -303,7 +307,9 @@ impl WordDocidsExtractors { | |||||||
|         index: &Index, |         index: &Index, | ||||||
|         fields_ids_map: &GlobalFieldsIdsMap, |         fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|         indexer: GrenadParameters, |         indexer: GrenadParameters, | ||||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |         document_changes: impl IntoParallelIterator< | ||||||
|  |             Item = std::result::Result<DocumentChange, Arc<Error>>, | ||||||
|  |         >, | ||||||
|     ) -> Result<WordDocidsMergers> { |     ) -> Result<WordDocidsMergers> { | ||||||
|         let max_memory = indexer.max_memory_by_thread(); |         let max_memory = indexer.max_memory_by_thread(); | ||||||
|  |  | ||||||
| @@ -335,36 +341,35 @@ impl WordDocidsExtractors { | |||||||
|             max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, |             max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, | ||||||
|         }; |         }; | ||||||
|  |  | ||||||
|         let context_pool = ItemsPool::new(|| { |         let caches = AppendOnlyVec::new(); | ||||||
|             Ok(( |  | ||||||
|                 index.read_txn()?, |  | ||||||
|                 &document_tokenizer, |  | ||||||
|                 fields_ids_map.clone(), |  | ||||||
|                 WordDocidsCachedSorters::new( |  | ||||||
|                     indexer, |  | ||||||
|                     max_memory, |  | ||||||
|                     // TODO use a better value |  | ||||||
|                     200_000.try_into().unwrap(), |  | ||||||
|                 ), |  | ||||||
|             )) |  | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         { |         { | ||||||
|             let span = |             let span = | ||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); |                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|             document_changes.into_par_iter().try_for_each(|document_change| { |             document_changes.into_par_iter().try_arc_for_each_try_init( | ||||||
|                 context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { |                 || { | ||||||
|  |                     let rtxn = index.read_txn().map_err(Error::from)?; | ||||||
|  |                     let cache = caches.push(WordDocidsCachedSorters::new( | ||||||
|  |                         indexer, | ||||||
|  |                         max_memory, | ||||||
|  |                         // TODO use a better value | ||||||
|  |                         200_000.try_into().unwrap(), | ||||||
|  |                     )); | ||||||
|  |                     Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) | ||||||
|  |                 }, | ||||||
|  |                 |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { | ||||||
|                     Self::extract_document_change( |                     Self::extract_document_change( | ||||||
|                         &*rtxn, |                         rtxn, | ||||||
|                         index, |                         index, | ||||||
|                         document_tokenizer, |                         document_tokenizer, | ||||||
|                         fields_ids_map, |                         fields_ids_map, | ||||||
|                         cached_sorter, |                         cached_sorter, | ||||||
|                         document_change?, |                         document_change?, | ||||||
|                     ) |                     ) | ||||||
|                 }) |                     .map_err(Arc::new) | ||||||
|             })?; |                 }, | ||||||
|  |             )?; | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         { |         { | ||||||
| @@ -372,7 +377,7 @@ impl WordDocidsExtractors { | |||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); |                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|             let mut builder = WordDocidsMergerBuilders::new(); |             let mut builder = WordDocidsMergerBuilders::new(); | ||||||
|             for (_rtxn, _tokenizer, _fields_ids_map, cache) in context_pool.into_items() { |             for cache in caches.into_iter() { | ||||||
|                 builder.add_sorters(cache)?; |                 builder.add_sorters(cache)?; | ||||||
|             } |             } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -3,6 +3,7 @@ mod extract_word_pair_proximity_docids; | |||||||
| mod tokenize_document; | mod tokenize_document; | ||||||
|  |  | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; | pub use extract_word_docids::{WordDocidsExtractors, WordDocidsMergers}; | ||||||
| pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; | pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; | ||||||
| @@ -13,16 +14,20 @@ use tokenize_document::{tokenizer_builder, DocumentTokenizer}; | |||||||
|  |  | ||||||
| use super::cache::CboCachedSorter; | use super::cache::CboCachedSorter; | ||||||
| use super::DocidsExtractor; | use super::DocidsExtractor; | ||||||
| use crate::update::new::{DocumentChange, ItemsPool}; | use crate::update::new::append_only_vec::AppendOnlyVec; | ||||||
|  | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; | ||||||
|  | use crate::update::new::DocumentChange; | ||||||
| use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; | ||||||
| use crate::{GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; | use crate::{Error, GlobalFieldsIdsMap, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; | ||||||
|  |  | ||||||
| pub trait SearchableExtractor { | pub trait SearchableExtractor { | ||||||
|     fn run_extraction( |     fn run_extraction( | ||||||
|         index: &Index, |         index: &Index, | ||||||
|         fields_ids_map: &GlobalFieldsIdsMap, |         fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|         indexer: GrenadParameters, |         indexer: GrenadParameters, | ||||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |         document_changes: impl IntoParallelIterator< | ||||||
|  |             Item = std::result::Result<DocumentChange, Arc<Error>>, | ||||||
|  |         >, | ||||||
|     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { |     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { | ||||||
|         let max_memory = indexer.max_memory_by_thread(); |         let max_memory = indexer.max_memory_by_thread(); | ||||||
|  |  | ||||||
| @@ -53,43 +58,41 @@ pub trait SearchableExtractor { | |||||||
|             localized_attributes_rules: &localized_attributes_rules, |             localized_attributes_rules: &localized_attributes_rules, | ||||||
|             max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, |             max_positions_per_attributes: MAX_POSITION_PER_ATTRIBUTE, | ||||||
|         }; |         }; | ||||||
|  |         let caches = AppendOnlyVec::new(); | ||||||
|         let context_pool = ItemsPool::new(|| { |  | ||||||
|             Ok(( |  | ||||||
|                 index.read_txn()?, |  | ||||||
|                 &document_tokenizer, |  | ||||||
|                 fields_ids_map.clone(), |  | ||||||
|                 CboCachedSorter::new( |  | ||||||
|                     // TODO use a better value |  | ||||||
|                     1_000_000.try_into().unwrap(), |  | ||||||
|                     create_sorter( |  | ||||||
|                         grenad::SortAlgorithm::Stable, |  | ||||||
|                         MergeDeladdCboRoaringBitmaps, |  | ||||||
|                         indexer.chunk_compression_type, |  | ||||||
|                         indexer.chunk_compression_level, |  | ||||||
|                         indexer.max_nb_chunks, |  | ||||||
|                         max_memory, |  | ||||||
|                     ), |  | ||||||
|                 ), |  | ||||||
|             )) |  | ||||||
|         }); |  | ||||||
|  |  | ||||||
|         { |         { | ||||||
|             let span = |             let span = | ||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); |                 tracing::trace_span!(target: "indexing::documents::extract", "docids_extraction"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|             document_changes.into_par_iter().try_for_each(|document_change| { |             document_changes.into_par_iter().try_arc_for_each_try_init( | ||||||
|                 context_pool.with(|(rtxn, document_tokenizer, fields_ids_map, cached_sorter)| { |                 || { | ||||||
|  |                     let rtxn = index.read_txn().map_err(Error::from)?; | ||||||
|  |                     let cache = caches.push(CboCachedSorter::new( | ||||||
|  |                         // TODO use a better value | ||||||
|  |                         1_000_000.try_into().unwrap(), | ||||||
|  |                         create_sorter( | ||||||
|  |                             grenad::SortAlgorithm::Stable, | ||||||
|  |                             MergeDeladdCboRoaringBitmaps, | ||||||
|  |                             indexer.chunk_compression_type, | ||||||
|  |                             indexer.chunk_compression_level, | ||||||
|  |                             indexer.max_nb_chunks, | ||||||
|  |                             max_memory, | ||||||
|  |                         ), | ||||||
|  |                     )); | ||||||
|  |                     Ok((rtxn, &document_tokenizer, fields_ids_map.clone(), cache)) | ||||||
|  |                 }, | ||||||
|  |                 |(rtxn, document_tokenizer, fields_ids_map, cached_sorter), document_change| { | ||||||
|                     Self::extract_document_change( |                     Self::extract_document_change( | ||||||
|                         &*rtxn, |                         rtxn, | ||||||
|                         index, |                         index, | ||||||
|                         document_tokenizer, |                         document_tokenizer, | ||||||
|                         fields_ids_map, |                         fields_ids_map, | ||||||
|                         cached_sorter, |                         cached_sorter, | ||||||
|                         document_change?, |                         document_change?, | ||||||
|                     ) |                     ) | ||||||
|                 }) |                     .map_err(Arc::new) | ||||||
|             })?; |                 }, | ||||||
|  |             )?; | ||||||
|         } |         } | ||||||
|         { |         { | ||||||
|             let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); |             let mut builder = grenad::MergerBuilder::new(MergeDeladdCboRoaringBitmaps); | ||||||
| @@ -97,14 +100,15 @@ pub trait SearchableExtractor { | |||||||
|                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); |                 tracing::trace_span!(target: "indexing::documents::extract", "merger_building"); | ||||||
|             let _entered = span.enter(); |             let _entered = span.enter(); | ||||||
|  |  | ||||||
|             let readers: Vec<_> = context_pool |             let readers: Vec<_> = caches | ||||||
|                 .into_items() |                 .into_iter() | ||||||
|                 .par_bridge() |                 .par_bridge() | ||||||
|                 .map(|(_rtxn, _tokenizer, _fields_ids_map, cached_sorter)| { |                 .map(|cached_sorter| { | ||||||
|                     let sorter = cached_sorter.into_sorter()?; |                     let sorter = cached_sorter.into_sorter()?; | ||||||
|                     sorter.into_reader_cursors() |                     sorter.into_reader_cursors() | ||||||
|                 }) |                 }) | ||||||
|                 .collect(); |                 .collect(); | ||||||
|  |  | ||||||
|             for reader in readers { |             for reader in readers { | ||||||
|                 builder.extend(reader?); |                 builder.extend(reader?); | ||||||
|             } |             } | ||||||
| @@ -132,7 +136,9 @@ impl<T: SearchableExtractor> DocidsExtractor for T { | |||||||
|         index: &Index, |         index: &Index, | ||||||
|         fields_ids_map: &GlobalFieldsIdsMap, |         fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|         indexer: GrenadParameters, |         indexer: GrenadParameters, | ||||||
|         document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |         document_changes: impl IntoParallelIterator< | ||||||
|  |             Item = std::result::Result<DocumentChange, Arc<Error>>, | ||||||
|  |         >, | ||||||
|     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { |     ) -> Result<Merger<File, MergeDeladdCboRoaringBitmaps>> { | ||||||
|         Self::run_extraction(index, fields_ids_map, indexer, document_changes) |         Self::run_extraction(index, fields_ids_map, indexer, document_changes) | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -1,11 +1,12 @@ | |||||||
| use std::sync::Arc; | use std::sync::Arc; | ||||||
|  |  | ||||||
| use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; | use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; | ||||||
| use roaring::RoaringBitmap; | use roaring::RoaringBitmap; | ||||||
|  |  | ||||||
| use super::DocumentChanges; | use super::DocumentChanges; | ||||||
| use crate::update::new::{Deletion, DocumentChange, ItemsPool}; | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; | ||||||
| use crate::{FieldsIdsMap, Index, Result}; | use crate::update::new::{Deletion, DocumentChange}; | ||||||
|  | use crate::{Error, FieldsIdsMap, Index, Result}; | ||||||
|  |  | ||||||
| pub struct DocumentDeletion { | pub struct DocumentDeletion { | ||||||
|     pub to_delete: RoaringBitmap, |     pub to_delete: RoaringBitmap, | ||||||
| @@ -28,15 +29,19 @@ impl<'p> DocumentChanges<'p> for DocumentDeletion { | |||||||
|         self, |         self, | ||||||
|         _fields_ids_map: &mut FieldsIdsMap, |         _fields_ids_map: &mut FieldsIdsMap, | ||||||
|         param: Self::Parameter, |         param: Self::Parameter, | ||||||
|     ) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> { |     ) -> Result< | ||||||
|  |         impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |             + Clone | ||||||
|  |             + 'p, | ||||||
|  |     > { | ||||||
|         let index = param; |         let index = param; | ||||||
|         let items = Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))); |  | ||||||
|         let to_delete: Vec<_> = self.to_delete.into_iter().collect(); |         let to_delete: Vec<_> = self.to_delete.into_iter().collect(); | ||||||
|         Ok(to_delete.into_par_iter().map_with(items, |items, docid| { |         Ok(to_delete.into_par_iter().try_map_try_init( | ||||||
|             items.with(|rtxn| { |             || index.read_txn().map_err(crate::Error::from), | ||||||
|  |             |rtxn, docid| { | ||||||
|                 let current = index.document(rtxn, docid)?; |                 let current = index.document(rtxn, docid)?; | ||||||
|                 Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) |                 Ok(DocumentChange::Deletion(Deletion::create(docid, current.boxed()))) | ||||||
|             }) |             }, | ||||||
|         })) |         )) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -5,14 +5,14 @@ use std::sync::Arc; | |||||||
| use heed::types::Bytes; | use heed::types::Bytes; | ||||||
| use heed::RoTxn; | use heed::RoTxn; | ||||||
| use memmap2::Mmap; | use memmap2::Mmap; | ||||||
| use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; | use rayon::iter::{IndexedParallelIterator, IntoParallelIterator}; | ||||||
| use IndexDocumentsMethod as Idm; | use IndexDocumentsMethod as Idm; | ||||||
|  |  | ||||||
| use super::super::document_change::DocumentChange; | use super::super::document_change::DocumentChange; | ||||||
| use super::super::items_pool::ItemsPool; |  | ||||||
| use super::super::{CowStr, TopLevelMap}; | use super::super::{CowStr, TopLevelMap}; | ||||||
| use super::DocumentChanges; | use super::DocumentChanges; | ||||||
| use crate::documents::{DocumentIdExtractionError, PrimaryKey}; | use crate::documents::{DocumentIdExtractionError, PrimaryKey}; | ||||||
|  | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; | ||||||
| use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; | use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId, Update}; | ||||||
| use crate::update::{AvailableIds, IndexDocumentsMethod}; | use crate::update::{AvailableIds, IndexDocumentsMethod}; | ||||||
| use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; | use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; | ||||||
| @@ -73,7 +73,11 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { | |||||||
|         self, |         self, | ||||||
|         fields_ids_map: &mut FieldsIdsMap, |         fields_ids_map: &mut FieldsIdsMap, | ||||||
|         param: Self::Parameter, |         param: Self::Parameter, | ||||||
|     ) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> { |     ) -> Result< | ||||||
|  |         impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |             + Clone | ||||||
|  |             + 'p, | ||||||
|  |     > { | ||||||
|         let (index, rtxn, primary_key) = param; |         let (index, rtxn, primary_key) = param; | ||||||
|  |  | ||||||
|         let documents_ids = index.documents_ids(rtxn)?; |         let documents_ids = index.documents_ids(rtxn)?; | ||||||
| @@ -199,24 +203,22 @@ impl<'p, 'pl: 'p> DocumentChanges<'p> for DocumentOperation<'pl> { | |||||||
|         // And finally sort them |         // And finally sort them | ||||||
|         docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); |         docids_version_offsets.sort_unstable_by_key(|(_, (_, docops))| sort_function_key(docops)); | ||||||
|  |  | ||||||
|         Ok(docids_version_offsets.into_par_iter().map_with( |         Ok(docids_version_offsets.into_par_iter().try_map_try_init( | ||||||
|             Arc::new(ItemsPool::new(|| index.read_txn().map_err(crate::Error::from))), |             || index.read_txn().map_err(Error::from), | ||||||
|             move |context_pool, (external_docid, (internal_docid, operations))| { |             move |rtxn, (external_docid, (internal_docid, operations))| { | ||||||
|                 context_pool.with(|rtxn| { |                 let document_merge_function = match self.index_documents_method { | ||||||
|                     let document_merge_function = match self.index_documents_method { |                     Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, | ||||||
|                         Idm::ReplaceDocuments => MergeDocumentForReplacement::merge, |                     Idm::UpdateDocuments => MergeDocumentForUpdates::merge, | ||||||
|                         Idm::UpdateDocuments => MergeDocumentForUpdates::merge, |                 }; | ||||||
|                     }; |  | ||||||
|  |  | ||||||
|                     document_merge_function( |                 document_merge_function( | ||||||
|                         rtxn, |                     rtxn, | ||||||
|                         index, |                     index, | ||||||
|                         &fields_ids_map, |                     &fields_ids_map, | ||||||
|                         internal_docid, |                     internal_docid, | ||||||
|                         external_docid.to_string(), // TODO do not clone |                     external_docid.to_string(), // TODO do not clone | ||||||
|                         &operations, |                     &operations, | ||||||
|                     ) |                 ) | ||||||
|                 }) |  | ||||||
|             }, |             }, | ||||||
|         )) |         )) | ||||||
|     } |     } | ||||||
|   | |||||||
| @@ -1,4 +1,4 @@ | |||||||
| use std::sync::RwLock; | use std::sync::{Arc, RwLock}; | ||||||
| use std::thread::{self, Builder}; | use std::thread::{self, Builder}; | ||||||
|  |  | ||||||
| use big_s::S; | use big_s::S; | ||||||
| @@ -22,8 +22,9 @@ use super::{StdResult, TopLevelMap}; | |||||||
| use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; | use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; | ||||||
| use crate::update::new::channel::ExtractorSender; | use crate::update::new::channel::ExtractorSender; | ||||||
| use crate::update::settings::InnerIndexSettings; | use crate::update::settings::InnerIndexSettings; | ||||||
|  | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; | ||||||
| use crate::update::GrenadParameters; | use crate::update::GrenadParameters; | ||||||
| use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; | use crate::{Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, Result, UserError}; | ||||||
|  |  | ||||||
| mod document_deletion; | mod document_deletion; | ||||||
| mod document_operation; | mod document_operation; | ||||||
| @@ -37,7 +38,11 @@ pub trait DocumentChanges<'p> { | |||||||
|         self, |         self, | ||||||
|         fields_ids_map: &mut FieldsIdsMap, |         fields_ids_map: &mut FieldsIdsMap, | ||||||
|         param: Self::Parameter, |         param: Self::Parameter, | ||||||
|     ) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p>; |     ) -> Result< | ||||||
|  |         impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |             + Clone | ||||||
|  |             + 'p, | ||||||
|  |     >; | ||||||
| } | } | ||||||
|  |  | ||||||
| /// This is the main function of this crate. | /// This is the main function of this crate. | ||||||
| @@ -53,7 +58,9 @@ pub fn index<PI>( | |||||||
|     document_changes: PI, |     document_changes: PI, | ||||||
| ) -> Result<()> | ) -> Result<()> | ||||||
| where | where | ||||||
|     PI: IndexedParallelIterator<Item = Result<DocumentChange>> + Send + Clone, |     PI: IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |         + Send | ||||||
|  |         + Clone, | ||||||
| { | { | ||||||
|     let (merger_sender, writer_receiver) = merger_writer_channel(10_000); |     let (merger_sender, writer_receiver) = merger_writer_channel(10_000); | ||||||
|     // This channel acts as a rendezvous point to ensure that we are one task ahead |     // This channel acts as a rendezvous point to ensure that we are one task ahead | ||||||
| @@ -74,7 +81,8 @@ where | |||||||
|  |  | ||||||
|                     // document but we need to create a function that collects and compresses documents. |                     // document but we need to create a function that collects and compresses documents. | ||||||
|                     let document_sender = extractor_sender.document_sender(); |                     let document_sender = extractor_sender.document_sender(); | ||||||
|                     document_changes.clone().into_par_iter().try_for_each(|result| { |                     document_changes.clone().into_par_iter().try_arc_for_each::<_, Error>( | ||||||
|  |                         |result| { | ||||||
|                         match result? { |                         match result? { | ||||||
|                             DocumentChange::Deletion(deletion) => { |                             DocumentChange::Deletion(deletion) => { | ||||||
|                                 let docid = deletion.docid(); |                                 let docid = deletion.docid(); | ||||||
| @@ -92,7 +100,7 @@ where | |||||||
|                                 // extracted_dictionary_sender.send(self, dictionary: &[u8]); |                                 // extracted_dictionary_sender.send(self, dictionary: &[u8]); | ||||||
|                             } |                             } | ||||||
|                         } |                         } | ||||||
|                         Ok(()) as Result<_> |                         Ok(()) | ||||||
|                     })?; |                     })?; | ||||||
|  |  | ||||||
|                     document_sender.finish().unwrap(); |                     document_sender.finish().unwrap(); | ||||||
| @@ -242,7 +250,7 @@ fn extract_and_send_docids<E: DocidsExtractor, D: MergerOperationType>( | |||||||
|     index: &Index, |     index: &Index, | ||||||
|     fields_ids_map: &GlobalFieldsIdsMap, |     fields_ids_map: &GlobalFieldsIdsMap, | ||||||
|     indexer: GrenadParameters, |     indexer: GrenadParameters, | ||||||
|     document_changes: impl IntoParallelIterator<Item = Result<DocumentChange>>, |     document_changes: impl IntoParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>>, | ||||||
|     sender: &ExtractorSender, |     sender: &ExtractorSender, | ||||||
| ) -> Result<()> { | ) -> Result<()> { | ||||||
|     let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; |     let merger = E::run_extraction(index, fields_ids_map, indexer, document_changes)?; | ||||||
|   | |||||||
| @@ -1,8 +1,11 @@ | |||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| use rayon::iter::IndexedParallelIterator; | use rayon::iter::IndexedParallelIterator; | ||||||
|  |  | ||||||
| use super::DocumentChanges; | use super::DocumentChanges; | ||||||
| use crate::documents::{DocumentIdExtractionError, PrimaryKey}; | use crate::documents::{DocumentIdExtractionError, PrimaryKey}; | ||||||
| use crate::update::concurrent_available_ids::ConcurrentAvailableIds; | use crate::update::concurrent_available_ids::ConcurrentAvailableIds; | ||||||
|  | use crate::update::new::parallel_iterator_ext::ParallelIteratorExt; | ||||||
| use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; | use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; | ||||||
| use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; | use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; | ||||||
|  |  | ||||||
| @@ -30,44 +33,53 @@ where | |||||||
|         self, |         self, | ||||||
|         _fields_ids_map: &mut FieldsIdsMap, |         _fields_ids_map: &mut FieldsIdsMap, | ||||||
|         param: Self::Parameter, |         param: Self::Parameter, | ||||||
|     ) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> { |     ) -> Result< | ||||||
|  |         impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |             + Clone | ||||||
|  |             + 'p, | ||||||
|  |     > { | ||||||
|         let (fields_ids_map, concurrent_available_ids, primary_key) = param; |         let (fields_ids_map, concurrent_available_ids, primary_key) = param; | ||||||
|  |  | ||||||
|         Ok(self.iter.map(|object| { |         Ok(self.iter.try_map_try_init( | ||||||
|             let docid = match concurrent_available_ids.next() { |             || Ok(()), | ||||||
|                 Some(id) => id, |             |_, object| { | ||||||
|                 None => return Err(Error::UserError(UserError::DocumentLimitReached)), |                 let docid = match concurrent_available_ids.next() { | ||||||
|             }; |                     Some(id) => id, | ||||||
|  |                     None => return Err(Error::UserError(UserError::DocumentLimitReached)), | ||||||
|  |                 }; | ||||||
|  |  | ||||||
|             let mut writer = KvWriterFieldId::memory(); |                 let mut writer = KvWriterFieldId::memory(); | ||||||
|             object.iter().for_each(|(key, value)| { |                 object.iter().for_each(|(key, value)| { | ||||||
|                 let key = fields_ids_map.id(key).unwrap(); |                     let key = fields_ids_map.id(key).unwrap(); | ||||||
|                 /// TODO better error management |                     /// TODO better error management | ||||||
|                 let value = serde_json::to_vec(&value).unwrap(); |                     let value = serde_json::to_vec(&value).unwrap(); | ||||||
|                 /// TODO it is not ordered |                     /// TODO it is not ordered | ||||||
|                 writer.insert(key, value).unwrap(); |                     writer.insert(key, value).unwrap(); | ||||||
|             }); |                 }); | ||||||
|  |  | ||||||
|             let document = writer.into_boxed(); |                 let document = writer.into_boxed(); | ||||||
|             let external_docid = match primary_key.document_id(&document, fields_ids_map)? { |                 let external_docid = match primary_key.document_id(&document, fields_ids_map)? { | ||||||
|                 Ok(document_id) => Ok(document_id), |                     Ok(document_id) => Ok(document_id), | ||||||
|                 Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => Err(user_error), |                     Err(DocumentIdExtractionError::InvalidDocumentId(user_error)) => { | ||||||
|                 Err(DocumentIdExtractionError::MissingDocumentId) => { |                         Err(user_error) | ||||||
|                     Err(UserError::MissingDocumentId { |                     } | ||||||
|                         primary_key: primary_key.name().to_string(), |                     Err(DocumentIdExtractionError::MissingDocumentId) => { | ||||||
|                         document: all_obkv_to_json(&document, fields_ids_map)?, |                         Err(UserError::MissingDocumentId { | ||||||
|                     }) |                             primary_key: primary_key.name().to_string(), | ||||||
|                 } |                             document: all_obkv_to_json(&document, fields_ids_map)?, | ||||||
|                 Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { |                         }) | ||||||
|                     Err(UserError::TooManyDocumentIds { |                     } | ||||||
|                         primary_key: primary_key.name().to_string(), |                     Err(DocumentIdExtractionError::TooManyDocumentIds(_)) => { | ||||||
|                         document: all_obkv_to_json(&document, fields_ids_map)?, |                         Err(UserError::TooManyDocumentIds { | ||||||
|                     }) |                             primary_key: primary_key.name().to_string(), | ||||||
|                 } |                             document: all_obkv_to_json(&document, fields_ids_map)?, | ||||||
|             }?; |                         }) | ||||||
|  |                     } | ||||||
|  |                 }?; | ||||||
|  |  | ||||||
|             let insertion = Insertion::create(docid, document); |                 let insertion = Insertion::create(docid, document); | ||||||
|             Ok(DocumentChange::Insertion(insertion)) |                 Ok(DocumentChange::Insertion(insertion)) | ||||||
|         })) |             }, | ||||||
|  |         )) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,8 +1,10 @@ | |||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
| use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; | use rayon::iter::{IndexedParallelIterator, IntoParallelIterator, ParallelIterator}; | ||||||
|  |  | ||||||
| use super::DocumentChanges; | use super::DocumentChanges; | ||||||
| use crate::update::new::DocumentChange; | use crate::update::new::DocumentChange; | ||||||
| use crate::{FieldsIdsMap, Result}; | use crate::{Error, FieldsIdsMap, Result}; | ||||||
|  |  | ||||||
| pub struct UpdateByFunction; | pub struct UpdateByFunction; | ||||||
|  |  | ||||||
| @@ -13,7 +15,11 @@ impl<'p> DocumentChanges<'p> for UpdateByFunction { | |||||||
|         self, |         self, | ||||||
|         _fields_ids_map: &mut FieldsIdsMap, |         _fields_ids_map: &mut FieldsIdsMap, | ||||||
|         _param: Self::Parameter, |         _param: Self::Parameter, | ||||||
|     ) -> Result<impl IndexedParallelIterator<Item = Result<DocumentChange>> + Clone + 'p> { |     ) -> Result< | ||||||
|  |         impl IndexedParallelIterator<Item = std::result::Result<DocumentChange, Arc<Error>>> | ||||||
|  |             + Clone | ||||||
|  |             + 'p, | ||||||
|  |     > { | ||||||
|         Ok((0..100).into_par_iter().map(|_| todo!())) |         Ok((0..100).into_par_iter().map(|_| todo!())) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|   | |||||||
| @@ -1,54 +0,0 @@ | |||||||
| use crossbeam_channel::{Receiver, Sender, TryRecvError}; |  | ||||||
|  |  | ||||||
| /// A pool of items that can be pull and generated on demand. |  | ||||||
| pub struct ItemsPool<F, T, E> |  | ||||||
| where |  | ||||||
|     F: Fn() -> Result<T, E>, |  | ||||||
| { |  | ||||||
|     init: F, |  | ||||||
|     sender: Sender<T>, |  | ||||||
|     receiver: Receiver<T>, |  | ||||||
| } |  | ||||||
|  |  | ||||||
| impl<F, T, E> ItemsPool<F, T, E> |  | ||||||
| where |  | ||||||
|     F: Fn() -> Result<T, E>, |  | ||||||
| { |  | ||||||
|     /// Create a new unbounded items pool with the specified function |  | ||||||
|     /// to generate items when needed. |  | ||||||
|     /// |  | ||||||
|     /// The `init` function will be invoked whenever a call to `with` requires new items. |  | ||||||
|     pub fn new(init: F) -> Self { |  | ||||||
|         let (sender, receiver) = crossbeam_channel::unbounded(); |  | ||||||
|         ItemsPool { init, sender, receiver } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /// Consumes the pool to retrieve all remaining items. |  | ||||||
|     /// |  | ||||||
|     /// This method is useful for cleaning up and managing the items once they are no longer needed. |  | ||||||
|     pub fn into_items(self) -> crossbeam_channel::IntoIter<T> { |  | ||||||
|         self.receiver.into_iter() |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /// Allows running a function on an item from the pool, |  | ||||||
|     /// potentially generating a new item if the pool is empty. |  | ||||||
|     pub fn with<G, R>(&self, f: G) -> Result<R, E> |  | ||||||
|     where |  | ||||||
|         G: FnOnce(&mut T) -> Result<R, E>, |  | ||||||
|     { |  | ||||||
|         let mut item = match self.receiver.try_recv() { |  | ||||||
|             Ok(item) => item, |  | ||||||
|             Err(TryRecvError::Empty) => (self.init)()?, |  | ||||||
|             Err(TryRecvError::Disconnected) => unreachable!(), |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         // Run the user's closure with the retrieved item |  | ||||||
|         let result = f(&mut item); |  | ||||||
|  |  | ||||||
|         if let Err(e) = self.sender.send(item) { |  | ||||||
|             unreachable!("error when sending into channel {e}"); |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         result |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| @@ -1,16 +1,16 @@ | |||||||
| pub use document_change::{Deletion, DocumentChange, Insertion, Update}; | pub use document_change::{Deletion, DocumentChange, Insertion, Update}; | ||||||
| pub use items_pool::ItemsPool; |  | ||||||
| pub use top_level_map::{CowStr, TopLevelMap}; | pub use top_level_map::{CowStr, TopLevelMap}; | ||||||
|  |  | ||||||
| use super::del_add::DelAdd; | use super::del_add::DelAdd; | ||||||
| use crate::FieldId; | use crate::FieldId; | ||||||
|  |  | ||||||
|  | mod append_only_vec; | ||||||
| mod channel; | mod channel; | ||||||
| mod document_change; | mod document_change; | ||||||
| mod extract; | mod extract; | ||||||
| pub mod indexer; | pub mod indexer; | ||||||
| mod items_pool; |  | ||||||
| mod merger; | mod merger; | ||||||
|  | mod parallel_iterator_ext; | ||||||
| mod top_level_map; | mod top_level_map; | ||||||
| mod word_fst_builder; | mod word_fst_builder; | ||||||
| mod words_prefix_docids; | mod words_prefix_docids; | ||||||
|   | |||||||
							
								
								
									
										74
									
								
								milli/src/update/new/parallel_iterator_ext.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										74
									
								
								milli/src/update/new/parallel_iterator_ext.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,74 @@ | |||||||
|  | use std::sync::Arc; | ||||||
|  |  | ||||||
|  | use rayon::iter::{MapInit, ParallelIterator}; | ||||||
|  |  | ||||||
|  | pub trait ParallelIteratorExt: ParallelIterator { | ||||||
|  |     /// Maps items based on the init function. | ||||||
|  |     /// | ||||||
|  |     /// The init function is ran only as necessary which is basically once by thread. | ||||||
|  |     fn try_map_try_init<F, INIT, T, E, R>( | ||||||
|  |         self, | ||||||
|  |         init: INIT, | ||||||
|  |         map_op: F, | ||||||
|  |     ) -> MapInit< | ||||||
|  |         Self, | ||||||
|  |         impl Fn() -> Result<T, Arc<E>> + Sync + Send + Clone, | ||||||
|  |         impl Fn(&mut Result<T, Arc<E>>, Self::Item) -> Result<R, Arc<E>> + Sync + Send + Clone, | ||||||
|  |     > | ||||||
|  |     where | ||||||
|  |         E: Send + Sync, | ||||||
|  |         F: Fn(&mut T, Self::Item) -> Result<R, E> + Sync + Send + Clone, | ||||||
|  |         INIT: Fn() -> Result<T, E> + Sync + Send + Clone, | ||||||
|  |         R: Send, | ||||||
|  |     { | ||||||
|  |         self.map_init( | ||||||
|  |             move || match init() { | ||||||
|  |                 Ok(t) => Ok(t), | ||||||
|  |                 Err(err) => Err(Arc::new(err)), | ||||||
|  |             }, | ||||||
|  |             move |result, item| match result { | ||||||
|  |                 Ok(t) => map_op(t, item).map_err(Arc::new), | ||||||
|  |                 Err(err) => Err(err.clone()), | ||||||
|  |             }, | ||||||
|  |         ) | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     /// A method to run a closure of all the items and return an owned error. | ||||||
|  |     /// | ||||||
|  |     /// The init function is ran only as necessary which is basically once by thread. | ||||||
|  |     fn try_arc_for_each_try_init<F, INIT, T, E>(self, init: INIT, op: F) -> Result<(), E> | ||||||
|  |     where | ||||||
|  |         E: Send + Sync, | ||||||
|  |         F: Fn(&mut T, Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone, | ||||||
|  |         INIT: Fn() -> Result<T, E> + Sync + Send + Clone, | ||||||
|  |     { | ||||||
|  |         let result = self.try_for_each_init( | ||||||
|  |             move || match init() { | ||||||
|  |                 Ok(t) => Ok(t), | ||||||
|  |                 Err(err) => Err(Arc::new(err)), | ||||||
|  |             }, | ||||||
|  |             move |result, item| match result { | ||||||
|  |                 Ok(t) => op(t, item), | ||||||
|  |                 Err(err) => Err(err.clone()), | ||||||
|  |             }, | ||||||
|  |         ); | ||||||
|  |  | ||||||
|  |         match result { | ||||||
|  |             Ok(()) => Ok(()), | ||||||
|  |             Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn try_arc_for_each<F, E>(self, op: F) -> Result<(), E> | ||||||
|  |     where | ||||||
|  |         E: Send + Sync, | ||||||
|  |         F: Fn(Self::Item) -> Result<(), Arc<E>> + Sync + Send + Clone, | ||||||
|  |     { | ||||||
|  |         match self.try_for_each(op) { | ||||||
|  |             Ok(()) => Ok(()), | ||||||
|  |             Err(err) => Err(Arc::into_inner(err).expect("the error must be only owned by us")), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<T: ParallelIterator> ParallelIteratorExt for T {} | ||||||
		Reference in New Issue
	
	Block a user