mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Fix Pr comments
This commit is contained in:
		| @@ -6,7 +6,6 @@ edition = "2018" | ||||
|  | ||||
| [dependencies] | ||||
| bstr = "0.2.15" | ||||
| byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } | ||||
| byteorder = "1.4.2" | ||||
| chrono = { version = "0.4.19", features = ["serde"] } | ||||
| concat-arrays = "0.1.2" | ||||
|   | ||||
| @@ -5,6 +5,7 @@ use std::{marker, str}; | ||||
| use crate::error::SerializationError; | ||||
| use crate::heed_codec::RoaringBitmapCodec; | ||||
| use crate::{try_split_array_at, try_split_at, Result}; | ||||
|  | ||||
| pub type FacetStringLevelZeroValueCodec = StringValueCodec<RoaringBitmapCodec>; | ||||
|  | ||||
| /// A codec that encodes a string in front of a value. | ||||
| @@ -22,7 +23,6 @@ where | ||||
|  | ||||
|     fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> { | ||||
|         let (string, bytes) = decode_prefix_string(bytes)?; | ||||
|  | ||||
|         C::bytes_decode(bytes).map(|item| (string, item)) | ||||
|     } | ||||
| } | ||||
| @@ -49,7 +49,6 @@ pub fn decode_prefix_string(value: &[u8]) -> Option<(&str, &[u8])> { | ||||
|     let original_length = u16::from_be_bytes(original_length_bytes) as usize; | ||||
|     let (string, bytes) = try_split_at(bytes, original_length)?; | ||||
|     let string = str::from_utf8(string).ok()?; | ||||
|  | ||||
|     Some((string, bytes)) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -55,9 +55,9 @@ impl CboRoaringBitmapCodec { | ||||
|  | ||||
|     /// Merge serialized CboRoaringBitmaps in a buffer. | ||||
|     /// | ||||
|     /// if the merged values len is under the threshold, | ||||
|     /// values are directly serialized in the buffer; | ||||
|     /// else a RoaringBitmap is created from the values and is serialized in the buffer. | ||||
|     /// if the merged values length is under the threshold, values are directly | ||||
|     /// serialized in the buffer else a RoaringBitmap is created from the | ||||
|     /// values and is serialized in the buffer. | ||||
|     pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> { | ||||
|         let mut roaring = RoaringBitmap::new(); | ||||
|         let mut vec = Vec::new(); | ||||
|   | ||||
| @@ -58,11 +58,12 @@ pub fn extract_fid_docid_facet_values<R: io::Read>( | ||||
|                 // insert facet numbers in sorter | ||||
|                 for number in numbers { | ||||
|                     key_buffer.truncate(size_of::<FieldId>() + size_of::<DocumentId>()); | ||||
|                     let value_bytes = f64_into_bytes(number).unwrap(); // invalid float | ||||
|                     key_buffer.extend_from_slice(&value_bytes); | ||||
|                     key_buffer.extend_from_slice(&number.to_be_bytes()); | ||||
|                     if let Some(value_bytes) = f64_into_bytes(number) { | ||||
|                         key_buffer.extend_from_slice(&value_bytes); | ||||
|                         key_buffer.extend_from_slice(&number.to_be_bytes()); | ||||
|  | ||||
|                     fid_docid_facet_numbers_sorter.insert(&key_buffer, ().as_bytes())?; | ||||
|                         fid_docid_facet_numbers_sorter.insert(&key_buffer, ().as_bytes())?; | ||||
|                     } | ||||
|                 } | ||||
|  | ||||
|                 // insert  normalized and original facet string in sorter | ||||
|   | ||||
| @@ -8,6 +8,8 @@ use super::helpers::{ | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, | ||||
|     try_split_array_at, GrenadParameters, MergeFn, | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::proximity::extract_position; | ||||
| use crate::{DocumentId, FieldId, Result}; | ||||
|  | ||||
| @@ -36,7 +38,8 @@ pub fn extract_fid_word_count_docids<R: io::Read>( | ||||
|     let mut current_document_id = None; | ||||
|  | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|         let (document_id_bytes, _word_bytes) = try_split_array_at(key).unwrap(); | ||||
|         let (document_id_bytes, _word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|  | ||||
|         let curr_document_id = *current_document_id.get_or_insert(document_id); | ||||
|   | ||||
| @@ -8,6 +8,8 @@ use super::helpers::{ | ||||
|     create_sorter, merge_roaring_bitmaps, serialize_roaring_bitmap, sorter_into_reader, | ||||
|     try_split_array_at, GrenadParameters, | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::Result; | ||||
|  | ||||
| /// Extracts the word and the documents ids where this word appear. | ||||
| @@ -31,7 +33,8 @@ pub fn extract_word_docids<R: io::Read>( | ||||
|  | ||||
|     let mut value_buffer = Vec::new(); | ||||
|     while let Some((key, _value)) = docid_word_positions.next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|  | ||||
|         let bitmap = RoaringBitmap::from_iter(Some(document_id)); | ||||
|   | ||||
| @@ -5,7 +5,10 @@ use super::helpers::{ | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, | ||||
|     try_split_array_at, GrenadParameters, | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::{DocumentId, Result}; | ||||
|  | ||||
| /// Extracts the word positions and the documents ids where this word appear. | ||||
| /// | ||||
| /// Returns a grenad reader with the list of extracted words at positions and | ||||
| @@ -27,7 +30,8 @@ pub fn extract_word_level_position_docids<R: io::Read>( | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = DocumentId::from_be_bytes(document_id_bytes); | ||||
|  | ||||
|         for position in read_u32_ne_bytes(value) { | ||||
|   | ||||
| @@ -1,15 +1,14 @@ | ||||
| use std::cmp::Ordering; | ||||
| use std::collections::{BinaryHeap, HashMap}; | ||||
| use std::fs::File; | ||||
| use std::time::{Duration, Instant}; | ||||
| use std::{cmp, io, mem, str, vec}; | ||||
|  | ||||
| use log::debug; | ||||
|  | ||||
| use super::helpers::{ | ||||
|     create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader, | ||||
|     try_split_array_at, GrenadParameters, MergeFn, | ||||
| }; | ||||
| use crate::error::SerializationError; | ||||
| use crate::index::db_name::DOCID_WORD_POSITIONS; | ||||
| use crate::proximity::{positions_proximity, MAX_DISTANCE}; | ||||
| use crate::{DocumentId, Result}; | ||||
|  | ||||
| @@ -32,16 +31,13 @@ pub fn extract_word_pair_proximity_docids<R: io::Read>( | ||||
|         max_memory.map(|m| m / 2), | ||||
|     ); | ||||
|  | ||||
|     let mut number_of_documents = 0; | ||||
|     let mut total_time_aggregation = Duration::default(); | ||||
|     let mut total_time_grenad_insert = Duration::default(); | ||||
|  | ||||
|     // This map is assumed to not consume a lot of memory. | ||||
|     let mut document_word_positions_heap = BinaryHeap::new(); | ||||
|     let mut current_document_id = None; | ||||
|  | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key).unwrap(); | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|         let word = str::from_utf8(word_bytes)?; | ||||
|  | ||||
| @@ -52,10 +48,7 @@ pub fn extract_word_pair_proximity_docids<R: io::Read>( | ||||
|                 curr_document_id, | ||||
|                 document_word_positions_heap, | ||||
|                 &mut word_pair_proximity_docids_sorter, | ||||
|                 &mut total_time_aggregation, | ||||
|                 &mut total_time_grenad_insert, | ||||
|             )?; | ||||
|             number_of_documents += 1; | ||||
|             current_document_id = Some(document_id); | ||||
|         } | ||||
|  | ||||
| @@ -74,18 +67,9 @@ pub fn extract_word_pair_proximity_docids<R: io::Read>( | ||||
|             document_id, | ||||
|             document_word_positions_heap, | ||||
|             &mut word_pair_proximity_docids_sorter, | ||||
|             &mut total_time_aggregation, | ||||
|             &mut total_time_grenad_insert, | ||||
|         )?; | ||||
|     } | ||||
|  | ||||
|     debug!( | ||||
|         "Number of documents {} | ||||
|         - we took {:02?} to aggregate proximities | ||||
|         - we took {:02?} to grenad insert those proximities", | ||||
|         number_of_documents, total_time_aggregation, total_time_grenad_insert, | ||||
|     ); | ||||
|  | ||||
|     sorter_into_reader(word_pair_proximity_docids_sorter, indexer) | ||||
| } | ||||
|  | ||||
| @@ -97,10 +81,7 @@ fn document_word_positions_into_sorter<'b>( | ||||
|     document_id: DocumentId, | ||||
|     mut word_positions_heap: BinaryHeap<PeekedWordPosition<vec::IntoIter<u32>>>, | ||||
|     word_pair_proximity_docids_sorter: &mut grenad::Sorter<MergeFn>, | ||||
|     total_time_aggregation: &mut Duration, | ||||
|     total_time_grenad_insert: &mut Duration, | ||||
| ) -> Result<()> { | ||||
|     let before_aggregating = Instant::now(); | ||||
|     let mut word_pair_proximity = HashMap::new(); | ||||
|     let mut ordered_peeked_word_positions = Vec::new(); | ||||
|     while !word_positions_heap.is_empty() { | ||||
| @@ -152,8 +133,6 @@ fn document_word_positions_into_sorter<'b>( | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     *total_time_aggregation += before_aggregating.elapsed(); | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     for ((w1, w2), prox) in word_pair_proximity { | ||||
|         key_buffer.clear(); | ||||
| @@ -162,9 +141,7 @@ fn document_word_positions_into_sorter<'b>( | ||||
|         key_buffer.extend_from_slice(w2.as_bytes()); | ||||
|         key_buffer.push(prox as u8); | ||||
|  | ||||
|         let before_grenad_insert = Instant::now(); | ||||
|         word_pair_proximity_docids_sorter.insert(&key_buffer, &document_id.to_ne_bytes())?; | ||||
|         *total_time_grenad_insert += before_grenad_insert.elapsed(); | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
|   | ||||
| @@ -225,5 +225,6 @@ fn extract_documents_data( | ||||
|                 Ok((docid_fid_facet_numbers_chunk, docid_fid_facet_strings_chunk)) | ||||
|             }, | ||||
|         ); | ||||
|  | ||||
|     Ok((docid_word_positions_chunk?, docid_fid_facet_values_chunks?)) | ||||
| } | ||||
|   | ||||
| @@ -2,6 +2,8 @@ use std::sync::Arc; | ||||
|  | ||||
| use memmap::Mmap; | ||||
|  | ||||
| /// Wrapper around Mmap allowing to virtualy clone grenad-chunks | ||||
| /// in a parallel process like the indexing. | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct ClonableMmap { | ||||
|     inner: Arc<Mmap>, | ||||
|   | ||||
| @@ -3,7 +3,6 @@ use std::fs::File; | ||||
| use std::io::{self, Seek, SeekFrom}; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use byte_unit::Byte; | ||||
| use grenad::{CompressionType, MergerIter, Reader, Sorter}; | ||||
| use heed::types::ByteSlice; | ||||
| use log::debug; | ||||
| @@ -113,6 +112,9 @@ impl Default for GrenadParameters { | ||||
| } | ||||
|  | ||||
| impl GrenadParameters { | ||||
|     /// This function use the number of threads in the current threadpool to compute the value. | ||||
|     /// This should be called inside of a rayon thread pool, | ||||
|     /// Otherwise, it will take the global number of threads. | ||||
|     pub fn max_memory_by_thread(&self) -> Option<usize> { | ||||
|         self.max_memory.map(|max_memory| max_memory / rayon::current_num_threads()) | ||||
|     } | ||||
| @@ -128,7 +130,7 @@ pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
|     mut reader: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     log_frequency: Option<usize>, | ||||
|     documents_chunk_size: Byte, | ||||
|     documents_chunk_size: usize, | ||||
| ) -> Result<impl Iterator<Item = Result<grenad::Reader<File>>>> { | ||||
|     let mut document_count = 0; | ||||
|     let mut continue_reading = true; | ||||
| @@ -157,7 +159,7 @@ pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
|                 debug!("reached {} chunked documents", document_count); | ||||
|             } | ||||
|  | ||||
|             if current_chunk_size >= documents_chunk_size.get_bytes() { | ||||
|             if current_chunk_size >= documents_chunk_size as u64 { | ||||
|                 return writer_into_reader(obkv_documents).map(Some); | ||||
|             } | ||||
|         } | ||||
| @@ -170,8 +172,8 @@ pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
|         let result = transposer().transpose(); | ||||
|         if result.as_ref().map_or(false, |r| r.is_ok()) { | ||||
|             debug!( | ||||
|                 "A new chunk of approximately {} has been generated", | ||||
|                 documents_chunk_size.get_appropriate_unit(true), | ||||
|                 "A new chunk of approximately {:.2} MiB has been generated", | ||||
|                 documents_chunk_size as f64 / 1024.0 / 1024.0, | ||||
|             ); | ||||
|         } | ||||
|         result | ||||
|   | ||||
| @@ -40,10 +40,6 @@ where | ||||
|     Some((head, tail)) | ||||
| } | ||||
|  | ||||
| // pub fn pretty_thousands<A: Borrow<T>, T: fmt::Display>(number: A) -> String { | ||||
| //     thousands::Separable::separate_with_spaces(number.borrow()) | ||||
| // } | ||||
|  | ||||
| pub fn read_u32_ne_bytes(bytes: &[u8]) -> impl Iterator<Item = u32> + '_ { | ||||
|     bytes.chunks_exact(4).flat_map(TryInto::try_into).map(u32::from_ne_bytes) | ||||
| } | ||||
|   | ||||
| @@ -9,7 +9,6 @@ use std::iter::FromIterator; | ||||
| use std::num::{NonZeroU32, NonZeroUsize}; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use byte_unit::Byte; | ||||
| use chrono::Utc; | ||||
| use crossbeam_channel::{Receiver, Sender}; | ||||
| use grenad::{self, CompressionType}; | ||||
| @@ -252,7 +251,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { | ||||
|                 documents_file, | ||||
|                 params.clone(), | ||||
|                 self.log_every_n, | ||||
|                 Byte::from_bytes(self.documents_chunk_size.unwrap_or(1024 * 1024 * 128) as u64), // 128MiB | ||||
|                 self.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB | ||||
|             ); | ||||
|  | ||||
|             let result = chunk_iter.map(|chunk_iter| { | ||||
|   | ||||
| @@ -1,3 +1,4 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::fs::File; | ||||
|  | ||||
| use heed::types::ByteSlice; | ||||
| @@ -188,8 +189,6 @@ fn merge_roaring_bitmaps(new_value: &[u8], db_value: &[u8], buffer: &mut Vec<u8> | ||||
|     Ok(serialize_roaring_bitmap(&value, buffer)?) | ||||
| } | ||||
|  | ||||
| use std::borrow::Cow; | ||||
|  | ||||
| fn merge_cbo_roaring_bitmaps( | ||||
|     new_value: &[u8], | ||||
|     db_value: &[u8], | ||||
| @@ -199,11 +198,6 @@ fn merge_cbo_roaring_bitmaps( | ||||
|         &[Cow::Borrowed(db_value), Cow::Borrowed(new_value)], | ||||
|         buffer, | ||||
|     )?) | ||||
|  | ||||
|     // let new_value = CboRoaringBitmapCodec::deserialize_from(new_value)?; | ||||
|     // let db_value = CboRoaringBitmapCodec::deserialize_from(db_value)?; | ||||
|     // let value = new_value | db_value; | ||||
|     // Ok(CboRoaringBitmapCodec::serialize_into(&value, buffer)) | ||||
| } | ||||
|  | ||||
| /// Write provided entries in database using serialize_value function. | ||||
|   | ||||
		Reference in New Issue
	
	Block a user