mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 07:56:28 +00:00 
			
		
		
		
	Finally bump grenad to v0.4.1
This commit is contained in:
		| @@ -16,7 +16,7 @@ either = "1.6.1" | ||||
| flate2 = "1.0.20" | ||||
| fst = "0.4.5" | ||||
| fxhash = "0.2.1" | ||||
| grenad = { version = "0.3.1", default-features = false, features = ["tempfile"] } | ||||
| grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] } | ||||
| geoutils = "0.4.1" | ||||
| heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] } | ||||
| human_format = "1.0.3" | ||||
|   | ||||
| @@ -29,6 +29,7 @@ pub enum InternalError { | ||||
|     FieldIdMapMissingEntry(FieldIdMapMissingEntry), | ||||
|     Fst(fst::Error), | ||||
|     GrenadInvalidCompressionType, | ||||
|     GrenadInvalidFormatVersion, | ||||
|     IndexingMergingKeys { process: &'static str }, | ||||
|     InvalidDatabaseTyping, | ||||
|     RayonThreadPool(ThreadPoolBuildError), | ||||
| @@ -97,6 +98,9 @@ where | ||||
|             grenad::Error::InvalidCompressionType => { | ||||
|                 Error::InternalError(InternalError::GrenadInvalidCompressionType) | ||||
|             } | ||||
|             grenad::Error::InvalidFormatVersion => { | ||||
|                 Error::InternalError(InternalError::GrenadInvalidFormatVersion) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| @@ -186,6 +190,9 @@ impl fmt::Display for InternalError { | ||||
|             Self::GrenadInvalidCompressionType => { | ||||
|                 f.write_str("Invalid compression type have been specified to grenad.") | ||||
|             } | ||||
|             Self::GrenadInvalidFormatVersion => { | ||||
|                 f.write_str("Invalid grenad file with an invalid version format.") | ||||
|             } | ||||
|             Self::IndexingMergingKeys { process } => { | ||||
|                 write!(f, "Invalid merge while processing {}.", process) | ||||
|             } | ||||
|   | ||||
| @@ -160,8 +160,7 @@ fn compute_facet_number_levels<'t>( | ||||
|  | ||||
|     // It is forbidden to keep a cursor and write in a database at the same time with LMDB | ||||
|     // therefore we write the facet levels entries into a grenad file before transfering them. | ||||
|     let mut writer = tempfile::tempfile() | ||||
|         .and_then(|file| create_writer(compression_type, compression_level, file))?; | ||||
|     let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); | ||||
|  | ||||
|     let level_0_range = { | ||||
|         let left = (field_id, 0, f64::MIN, f64::MIN); | ||||
| @@ -279,8 +278,7 @@ fn compute_facet_string_levels<'t>( | ||||
|  | ||||
|     // It is forbidden to keep a cursor and write in a database at the same time with LMDB | ||||
|     // therefore we write the facet levels entries into a grenad file before transfering them. | ||||
|     let mut writer = tempfile::tempfile() | ||||
|         .and_then(|file| create_writer(compression_type, compression_level, file))?; | ||||
|     let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); | ||||
|  | ||||
|     // Groups sizes are always a power of the original level_group_size and therefore a group | ||||
|     // always maps groups of the previous level and never splits previous levels groups in half. | ||||
|   | ||||
| @@ -18,8 +18,8 @@ use crate::{absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_A | ||||
| /// Returns the generated internal documents ids and a grenad reader | ||||
| /// with the list of extracted words from the given chunk of documents. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_docid_word_positions<R: io::Read>( | ||||
|     mut obkv_documents: grenad::Reader<R>, | ||||
| pub fn extract_docid_word_positions<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     searchable_fields: &Option<HashSet<FieldId>>, | ||||
|     stop_words: Option<&fst::Set<&[u8]>>, | ||||
| @@ -46,7 +46,8 @@ pub fn extract_docid_word_positions<R: io::Read>( | ||||
|     } | ||||
|     let analyzer = Analyzer::<Vec<u8>>::new(AnalyzerConfig::default()); | ||||
|  | ||||
|     while let Some((key, value)) = obkv_documents.next()? { | ||||
|     let mut cursor = obkv_documents.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         let document_id = key | ||||
|             .try_into() | ||||
|             .map(u32::from_be_bytes) | ||||
|   | ||||
| @@ -14,8 +14,8 @@ use crate::Result; | ||||
| /// Returns a grenad reader with the list of extracted facet numbers and | ||||
| /// documents ids from the given chunk of docid facet number positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_facet_number_docids<R: io::Read>( | ||||
|     mut docid_fid_facet_number: grenad::Reader<R>, | ||||
| pub fn extract_facet_number_docids<R: io::Read + io::Seek>( | ||||
|     docid_fid_facet_number: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -28,7 +28,8 @@ pub fn extract_facet_number_docids<R: io::Read>( | ||||
|         max_memory, | ||||
|     ); | ||||
|  | ||||
|     while let Some((key_bytes, _)) = docid_fid_facet_number.next()? { | ||||
|     let mut cursor = docid_fid_facet_number.into_cursor()?; | ||||
|     while let Some((key_bytes, _)) = cursor.move_on_next()? { | ||||
|         let (field_id, document_id, number) = | ||||
|             FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); | ||||
|  | ||||
|   | ||||
| @@ -16,8 +16,8 @@ use crate::{FieldId, Result}; | ||||
| /// Returns a grenad reader with the list of extracted facet strings and | ||||
| /// documents ids from the given chunk of docid facet string positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_facet_string_docids<R: io::Read>( | ||||
|     mut docid_fid_facet_string: grenad::Reader<R>, | ||||
| pub fn extract_facet_string_docids<R: io::Read + io::Seek>( | ||||
|     docid_fid_facet_string: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -32,7 +32,8 @@ pub fn extract_facet_string_docids<R: io::Read>( | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     let mut value_buffer = Vec::new(); | ||||
|     while let Some((key, original_value_bytes)) = docid_fid_facet_string.next()? { | ||||
|     let mut cursor = docid_fid_facet_string.into_cursor()?; | ||||
|     while let Some((key, original_value_bytes)) = cursor.move_on_next()? { | ||||
|         let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); | ||||
|         let field_id = FieldId::from_be_bytes(field_id_bytes); | ||||
|         let (document_id_bytes, normalized_value_bytes) = try_split_array_at(bytes).unwrap(); | ||||
|   | ||||
| @@ -16,8 +16,8 @@ use crate::{DocumentId, FieldId, Result}; | ||||
| /// Returns the generated grenad reader containing the docid the fid and the orginal value as key | ||||
| /// and the normalized value as value extracted from the given chunk of documents. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_fid_docid_facet_values<R: io::Read>( | ||||
|     mut obkv_documents: grenad::Reader<R>, | ||||
| pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     faceted_fields: &HashSet<FieldId>, | ||||
| ) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> { | ||||
| @@ -40,7 +40,8 @@ pub fn extract_fid_docid_facet_values<R: io::Read>( | ||||
|     ); | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     while let Some((docid_bytes, value)) = obkv_documents.next()? { | ||||
|     let mut cursor = obkv_documents.into_cursor()?; | ||||
|     while let Some((docid_bytes, value)) = cursor.move_on_next()? { | ||||
|         let obkv = obkv::KvReader::new(value); | ||||
|  | ||||
|         for (field_id, field_bytes) in obkv.iter() { | ||||
|   | ||||
| @@ -18,8 +18,8 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; | ||||
| /// Returns a grenad reader with the list of extracted field id word counts | ||||
| /// and documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_fid_word_count_docids<R: io::Read>( | ||||
|     mut docid_word_positions: grenad::Reader<R>, | ||||
| pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -36,7 +36,8 @@ pub fn extract_fid_word_count_docids<R: io::Read>( | ||||
|     let mut document_fid_wordcount = HashMap::new(); | ||||
|     let mut current_document_id = None; | ||||
|  | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|     let mut cursor = docid_word_positions.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         let (document_id_bytes, _word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|   | ||||
| @@ -10,17 +10,20 @@ use crate::{FieldId, InternalError, Result, UserError}; | ||||
| /// Extracts the geographical coordinates contained in each document under the `_geo` field. | ||||
| /// | ||||
| /// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) | ||||
| pub fn extract_geo_points<R: io::Read>( | ||||
|     mut obkv_documents: grenad::Reader<R>, | ||||
| pub fn extract_geo_points<R: io::Read + io::Seek>( | ||||
|     obkv_documents: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     primary_key_id: FieldId, | ||||
|     geo_field_id: FieldId, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let mut writer = tempfile::tempfile().and_then(|file| { | ||||
|         create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) | ||||
|     })?; | ||||
|     let mut writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         tempfile::tempfile()?, | ||||
|     ); | ||||
|  | ||||
|     while let Some((docid_bytes, value)) = obkv_documents.next()? { | ||||
|     let mut cursor = obkv_documents.into_cursor()?; | ||||
|     while let Some((docid_bytes, value)) = cursor.move_on_next()? { | ||||
|         let obkv = obkv::KvReader::new(value); | ||||
|         let point: Value = match obkv.get(geo_field_id) { | ||||
|             Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?, | ||||
|   | ||||
| @@ -17,8 +17,8 @@ use crate::Result; | ||||
| /// Returns a grenad reader with the list of extracted words and | ||||
| /// documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_word_docids<R: io::Read>( | ||||
|     mut docid_word_positions: grenad::Reader<R>, | ||||
| pub fn extract_word_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -32,7 +32,8 @@ pub fn extract_word_docids<R: io::Read>( | ||||
|     ); | ||||
|  | ||||
|     let mut value_buffer = Vec::new(); | ||||
|     while let Some((key, _value)) = docid_word_positions.next()? { | ||||
|     let mut cursor = docid_word_positions.into_cursor()?; | ||||
|     while let Some((key, _value)) = cursor.move_on_next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|   | ||||
| @@ -17,8 +17,8 @@ use crate::{DocumentId, Result}; | ||||
| /// Returns a grenad reader with the list of extracted word pairs proximities and | ||||
| /// documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_word_pair_proximity_docids<R: io::Read>( | ||||
|     mut docid_word_positions: grenad::Reader<R>, | ||||
| pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -35,7 +35,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read>( | ||||
|     let mut document_word_positions_heap = BinaryHeap::new(); | ||||
|     let mut current_document_id = None; | ||||
|  | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|     let mut cursor = docid_word_positions.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = u32::from_be_bytes(document_id_bytes); | ||||
|   | ||||
| @@ -14,8 +14,8 @@ use crate::{DocumentId, Result}; | ||||
| /// Returns a grenad reader with the list of extracted words at positions and | ||||
| /// documents ids from the given chunk of docid word positions. | ||||
| #[logging_timer::time] | ||||
| pub fn extract_word_position_docids<R: io::Read>( | ||||
|     mut docid_word_positions: grenad::Reader<R>, | ||||
| pub fn extract_word_position_docids<R: io::Read + io::Seek>( | ||||
|     docid_word_positions: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let max_memory = indexer.max_memory_by_thread(); | ||||
| @@ -29,7 +29,8 @@ pub fn extract_word_position_docids<R: io::Read>( | ||||
|     ); | ||||
|  | ||||
|     let mut key_buffer = Vec::new(); | ||||
|     while let Some((key, value)) = docid_word_positions.next()? { | ||||
|     let mut cursor = docid_word_positions.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         let (document_id_bytes, word_bytes) = try_split_array_at(key) | ||||
|             .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; | ||||
|         let document_id = DocumentId::from_be_bytes(document_id_bytes); | ||||
|   | ||||
| @@ -17,7 +17,7 @@ pub fn create_writer<R: io::Write>( | ||||
|     typ: grenad::CompressionType, | ||||
|     level: Option<u32>, | ||||
|     file: R, | ||||
| ) -> io::Result<grenad::Writer<R>> { | ||||
| ) -> grenad::Writer<R> { | ||||
|     let mut builder = grenad::Writer::builder(); | ||||
|     builder.compression_type(typ); | ||||
|     if let Some(level) = level { | ||||
| @@ -52,10 +52,13 @@ pub fn sorter_into_reader( | ||||
|     sorter: grenad::Sorter<MergeFn>, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let mut writer = tempfile::tempfile().and_then(|file| { | ||||
|         create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) | ||||
|     })?; | ||||
|     sorter.write_into(&mut writer)?; | ||||
|     let mut writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         tempfile::tempfile()?, | ||||
|     ); | ||||
|     sorter.write_into_stream_writer(&mut writer)?; | ||||
|  | ||||
|     Ok(writer_into_reader(writer)?) | ||||
| } | ||||
|  | ||||
| @@ -75,20 +78,25 @@ pub unsafe fn into_clonable_grenad( | ||||
|     Ok(reader) | ||||
| } | ||||
|  | ||||
| pub fn merge_readers<R: io::Read>( | ||||
| pub fn merge_readers<R: io::Read + io::Seek>( | ||||
|     readers: Vec<grenad::Reader<R>>, | ||||
|     merge_fn: MergeFn, | ||||
|     indexer: GrenadParameters, | ||||
| ) -> Result<grenad::Reader<File>> { | ||||
|     let mut merger_builder = grenad::MergerBuilder::new(merge_fn); | ||||
|     merger_builder.extend(readers); | ||||
|     for reader in readers { | ||||
|         merger_builder.push(reader.into_cursor()?); | ||||
|     } | ||||
|  | ||||
|     let merger = merger_builder.build(); | ||||
|     let mut writer = tempfile::tempfile().and_then(|file| { | ||||
|         create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) | ||||
|     })?; | ||||
|     merger.write_into(&mut writer)?; | ||||
|     let reader = writer_into_reader(writer)?; | ||||
|     Ok(reader) | ||||
|     let mut writer = create_writer( | ||||
|         indexer.chunk_compression_type, | ||||
|         indexer.chunk_compression_level, | ||||
|         tempfile::tempfile()?, | ||||
|     ); | ||||
|     merger.write_into_stream_writer(&mut writer)?; | ||||
|  | ||||
|     Ok(writer_into_reader(writer)?) | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy)] | ||||
| @@ -125,12 +133,13 @@ impl GrenadParameters { | ||||
| /// The grenad obkv entries are composed of an incremental document id big-endian | ||||
| /// encoded as the key and an obkv object with an `u8` for the field as the key | ||||
| /// and a simple UTF-8 encoded string as the value. | ||||
| pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
|     mut reader: grenad::Reader<R>, | ||||
| pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>( | ||||
|     reader: grenad::Reader<R>, | ||||
|     indexer: GrenadParameters, | ||||
|     documents_chunk_size: usize, | ||||
| ) -> Result<impl Iterator<Item = Result<grenad::Reader<File>>>> { | ||||
|     let mut continue_reading = true; | ||||
|     let mut cursor = reader.into_cursor()?; | ||||
|  | ||||
|     let indexer_clone = indexer.clone(); | ||||
|     let mut transposer = move || { | ||||
| @@ -139,15 +148,13 @@ pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
|         } | ||||
|  | ||||
|         let mut current_chunk_size = 0u64; | ||||
|         let mut obkv_documents = tempfile::tempfile().and_then(|file| { | ||||
|             create_writer( | ||||
|                 indexer_clone.chunk_compression_type, | ||||
|                 indexer_clone.chunk_compression_level, | ||||
|                 file, | ||||
|             ) | ||||
|         })?; | ||||
|         let mut obkv_documents = create_writer( | ||||
|             indexer_clone.chunk_compression_type, | ||||
|             indexer_clone.chunk_compression_level, | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|  | ||||
|         while let Some((document_id, obkv)) = reader.next()? { | ||||
|         while let Some((document_id, obkv)) = cursor.move_on_next()? { | ||||
|             obkv_documents.insert(document_id, obkv)?; | ||||
|             current_chunk_size += document_id.len() as u64 + obkv.len() as u64; | ||||
|  | ||||
| @@ -166,13 +173,14 @@ pub fn grenad_obkv_into_chunks<R: io::Read>( | ||||
| pub fn write_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     mut reader: Reader<File>, | ||||
|     reader: Reader<File>, | ||||
|     merge: MergeFn, | ||||
| ) -> Result<()> { | ||||
|     debug!("Writing MTBL stores..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     while let Some((k, v)) = reader.next()? { | ||||
|     let mut cursor = reader.into_cursor()?; | ||||
|     while let Some((k, v)) = cursor.move_on_next()? { | ||||
|         let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; | ||||
|         match iter.next().transpose()? { | ||||
|             Some((key, old_val)) if key == k => { | ||||
| @@ -201,19 +209,19 @@ pub fn sorter_into_lmdb_database( | ||||
|     debug!("Writing MTBL sorter..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge)?; | ||||
|     merger_iter_into_lmdb_database(wtxn, database, sorter.into_stream_merger_iter()?, merge)?; | ||||
|  | ||||
|     debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn merger_iter_into_lmdb_database<R: io::Read>( | ||||
| fn merger_iter_into_lmdb_database<R: io::Read + io::Seek>( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     mut sorter: MergerIter<R, MergeFn>, | ||||
|     mut merger_iter: MergerIter<R, MergeFn>, | ||||
|     merge: MergeFn, | ||||
| ) -> Result<()> { | ||||
|     while let Some((k, v)) = sorter.next()? { | ||||
|     while let Some((k, v)) = merger_iter.next()? { | ||||
|         let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; | ||||
|         match iter.next().transpose()? { | ||||
|             Some((key, old_val)) if key == k => { | ||||
|   | ||||
| @@ -277,7 +277,7 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); | ||||
|  | ||||
|         // consume sorter, in order to free the internal allocation, before creating a new one. | ||||
|         let mut iter = self.sorter.into_merger_iter()?; | ||||
|         let mut iter = self.sorter.into_stream_merger_iter()?; | ||||
|  | ||||
|         // Once we have sort and deduplicated the documents we write them into a final file. | ||||
|         let mut final_sorter = create_sorter( | ||||
| @@ -374,16 +374,15 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         }); | ||||
|  | ||||
|         // We create a final writer to write the new documents in order from the sorter. | ||||
|         let file = tempfile::tempfile()?; | ||||
|         let mut writer = create_writer( | ||||
|             self.indexer_settings.chunk_compression_type, | ||||
|             self.indexer_settings.chunk_compression_level, | ||||
|             file, | ||||
|         )?; | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|  | ||||
|         // Once we have written all the documents into the final sorter, we write the documents | ||||
|         // into this writer, extract the file and reset the seek to be able to read it again. | ||||
|         final_sorter.write_into(&mut writer)?; | ||||
|         final_sorter.write_into_stream_writer(&mut writer)?; | ||||
|         let mut documents_file = writer.into_inner()?; | ||||
|         documents_file.seek(SeekFrom::Start(0))?; | ||||
|  | ||||
| @@ -424,12 +423,11 @@ impl<'a, 'i> Transform<'a, 'i> { | ||||
|         let documents_count = documents_ids.len() as usize; | ||||
|  | ||||
|         // We create a final writer to write the new documents in order from the sorter. | ||||
|         let file = tempfile::tempfile()?; | ||||
|         let mut writer = create_writer( | ||||
|             self.indexer_settings.chunk_compression_type, | ||||
|             self.indexer_settings.chunk_compression_level, | ||||
|             file, | ||||
|         )?; | ||||
|             tempfile::tempfile()?, | ||||
|         ); | ||||
|  | ||||
|         let mut obkv_buffer = Vec::new(); | ||||
|         for result in self.index.documents.iter(wtxn)? { | ||||
|   | ||||
| @@ -1,6 +1,7 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::convert::TryInto; | ||||
| use std::fs::File; | ||||
| use std::io; | ||||
|  | ||||
| use heed::types::ByteSlice; | ||||
| use heed::{BytesDecode, RwTxn}; | ||||
| @@ -65,8 +66,9 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|                 }, | ||||
|             )?; | ||||
|         } | ||||
|         TypedChunk::Documents(mut obkv_documents_iter) => { | ||||
|             while let Some((key, value)) = obkv_documents_iter.next()? { | ||||
|         TypedChunk::Documents(obkv_documents_iter) => { | ||||
|             let mut cursor = obkv_documents_iter.into_cursor()?; | ||||
|             while let Some((key, value)) = cursor.move_on_next()? { | ||||
|                 index.documents.remap_types::<ByteSlice, ByteSlice>().put(wtxn, key, value)?; | ||||
|             } | ||||
|         } | ||||
| @@ -85,7 +87,7 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             return Ok((documents_ids, is_merged_database)) | ||||
|         } | ||||
|         TypedChunk::WordDocids(word_docids_iter) => { | ||||
|             let mut word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; | ||||
|             let word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; | ||||
|             append_entries_into_database( | ||||
|                 word_docids_iter.clone(), | ||||
|                 &index.word_docids, | ||||
| @@ -97,7 +99,8 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|  | ||||
|             // create fst from word docids | ||||
|             let mut builder = fst::SetBuilder::memory(); | ||||
|             while let Some((word, _value)) = word_docids_iter.next()? { | ||||
|             let mut cursor = word_docids_iter.into_cursor()?; | ||||
|             while let Some((word, _value)) = cursor.move_on_next()? { | ||||
|                 // This is a lexicographically ordered word position | ||||
|                 // we use the key to construct the words fst. | ||||
|                 builder.insert(word)?; | ||||
| @@ -146,19 +149,21 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             )?; | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::FieldIdDocidFacetNumbers(mut fid_docid_facet_number) => { | ||||
|         TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => { | ||||
|             let index_fid_docid_facet_numbers = | ||||
|                 index.field_id_docid_facet_f64s.remap_types::<ByteSlice, ByteSlice>(); | ||||
|             while let Some((key, value)) = fid_docid_facet_number.next()? { | ||||
|             let mut cursor = fid_docid_facet_number.into_cursor()?; | ||||
|             while let Some((key, value)) = cursor.move_on_next()? { | ||||
|                 if valid_lmdb_key(key) { | ||||
|                     index_fid_docid_facet_numbers.put(wtxn, key, &value)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         TypedChunk::FieldIdDocidFacetStrings(mut fid_docid_facet_string) => { | ||||
|         TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => { | ||||
|             let index_fid_docid_facet_strings = | ||||
|                 index.field_id_docid_facet_strings.remap_types::<ByteSlice, ByteSlice>(); | ||||
|             while let Some((key, value)) = fid_docid_facet_string.next()? { | ||||
|             let mut cursor = fid_docid_facet_string.into_cursor()?; | ||||
|             while let Some((key, value)) = cursor.move_on_next()? { | ||||
|                 if valid_lmdb_key(key) { | ||||
|                     index_fid_docid_facet_strings.put(wtxn, key, &value)?; | ||||
|                 } | ||||
| @@ -183,11 +188,12 @@ pub(crate) fn write_typed_chunk_into_index( | ||||
|             )?; | ||||
|             is_merged_database = true; | ||||
|         } | ||||
|         TypedChunk::GeoPoints(mut geo_points) => { | ||||
|         TypedChunk::GeoPoints(geo_points) => { | ||||
|             let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); | ||||
|             let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?; | ||||
|  | ||||
|             while let Some((key, value)) = geo_points.next()? { | ||||
|             let mut cursor = geo_points.into_cursor()?; | ||||
|             while let Some((key, value)) = cursor.move_on_next()? { | ||||
|                 // convert the key back to a u32 (4 bytes) | ||||
|                 let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); | ||||
|  | ||||
| @@ -229,7 +235,7 @@ fn merge_cbo_roaring_bitmaps( | ||||
| /// Write provided entries in database using serialize_value function. | ||||
| /// merge_values function is used if an entry already exist in the database. | ||||
| fn write_entries_into_database<R, K, V, FS, FM>( | ||||
|     mut data: grenad::Reader<R>, | ||||
|     data: grenad::Reader<R>, | ||||
|     database: &heed::Database<K, V>, | ||||
|     wtxn: &mut RwTxn, | ||||
|     index_is_empty: bool, | ||||
| @@ -237,14 +243,15 @@ fn write_entries_into_database<R, K, V, FS, FM>( | ||||
|     merge_values: FM, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     R: std::io::Read, | ||||
|     R: io::Read + io::Seek, | ||||
|     FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, | ||||
|     FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>, | ||||
| { | ||||
|     let mut buffer = Vec::new(); | ||||
|     let database = database.remap_types::<ByteSlice, ByteSlice>(); | ||||
|  | ||||
|     while let Some((key, value)) = data.next()? { | ||||
|     let mut cursor = data.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         if valid_lmdb_key(key) { | ||||
|             buffer.clear(); | ||||
|             let value = if index_is_empty { | ||||
| @@ -270,7 +277,7 @@ where | ||||
| /// All provided entries must be ordered. | ||||
| /// If the index is not empty, write_entries_into_database is called instead. | ||||
| fn append_entries_into_database<R, K, V, FS, FM>( | ||||
|     mut data: grenad::Reader<R>, | ||||
|     data: grenad::Reader<R>, | ||||
|     database: &heed::Database<K, V>, | ||||
|     wtxn: &mut RwTxn, | ||||
|     index_is_empty: bool, | ||||
| @@ -278,7 +285,7 @@ fn append_entries_into_database<R, K, V, FS, FM>( | ||||
|     merge_values: FM, | ||||
| ) -> Result<()> | ||||
| where | ||||
|     R: std::io::Read, | ||||
|     R: io::Read + io::Seek, | ||||
|     FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>, | ||||
|     FM: Fn(&[u8], &[u8], &mut Vec<u8>) -> Result<()>, | ||||
| { | ||||
| @@ -296,7 +303,8 @@ where | ||||
|     let mut buffer = Vec::new(); | ||||
|     let mut database = database.iter_mut(wtxn)?.remap_types::<ByteSlice, ByteSlice>(); | ||||
|  | ||||
|     while let Some((key, value)) = data.next()? { | ||||
|     let mut cursor = data.into_cursor()?; | ||||
|     while let Some((key, value)) = cursor.move_on_next()? { | ||||
|         if valid_lmdb_key(key) { | ||||
|             buffer.clear(); | ||||
|             let value = serialize_value(value, &mut buffer)?; | ||||
|   | ||||
| @@ -51,8 +51,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { | ||||
|         ); | ||||
|  | ||||
|         let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); | ||||
|         word_docids_merger.extend(new_word_docids); | ||||
|         let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; | ||||
|         for reader in new_word_docids { | ||||
|             word_docids_merger.push(reader.into_cursor()?); | ||||
|         } | ||||
|         let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?; | ||||
|  | ||||
|         let mut current_prefixes: Option<&&[String]> = None; | ||||
|         let mut prefixes_cache = HashMap::new(); | ||||
|   | ||||
| @@ -77,8 +77,10 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { | ||||
|         // We retrieve and merge the created word pair proximities docids entries | ||||
|         // for the newly added documents. | ||||
|         let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); | ||||
|         wppd_merger.extend(new_word_pair_proximity_docids); | ||||
|         let mut wppd_iter = wppd_merger.build().into_merger_iter()?; | ||||
|         for reader in new_word_pair_proximity_docids { | ||||
|             wppd_merger.push(reader.into_cursor()?); | ||||
|         } | ||||
|         let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?; | ||||
|  | ||||
|         let mut word_prefix_pair_proximity_docids_sorter = create_sorter( | ||||
|             merge_cbo_roaring_bitmaps, | ||||
|   | ||||
| @@ -73,9 +73,11 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { | ||||
|         ); | ||||
|  | ||||
|         let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); | ||||
|         word_position_docids_merger.extend(new_word_position_docids); | ||||
|         for reader in new_word_position_docids { | ||||
|             word_position_docids_merger.push(reader.into_cursor()?); | ||||
|         } | ||||
|         let mut word_position_docids_iter = | ||||
|             word_position_docids_merger.build().into_merger_iter()?; | ||||
|             word_position_docids_merger.build().into_stream_merger_iter()?; | ||||
|  | ||||
|         // We fetch all the new common prefixes between the previous and new prefix fst. | ||||
|         let mut buffer = Vec::new(); | ||||
|   | ||||
		Reference in New Issue
	
	Block a user