mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-31 16:06:31 +00:00 
			
		
		
		
	Introduce the sorter_into_lmdb_database helper function
This commit is contained in:
		| @@ -8,7 +8,7 @@ use std::time::Instant; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use bstr::ByteSlice as _; | ||||
| use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; | ||||
| use grenad::{MergerIter, Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; | ||||
| use heed::types::ByteSlice; | ||||
| use log::{debug, info, error}; | ||||
| use memmap::Mmap; | ||||
| @@ -102,39 +102,19 @@ pub fn merge_into_lmdb_database( | ||||
|     sources: Vec<Reader<FileFuse>>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
| ) -> anyhow::Result<()> | ||||
| { | ||||
|     debug!("Merging {} MTBL stores...", sources.len()); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     let merger = merge_readers(sources, merge); | ||||
|     let mut in_iter = merger.into_merge_iter()?; | ||||
|  | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| { | ||||
|                     format!("writing {:?} into LMDB", k.as_bstr()) | ||||
|                 })?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; | ||||
|                 match iter.next().transpose()? { | ||||
|                     Some((key, old_val)) if key == k => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         iter.put_current(k, &val)?; | ||||
|                     }, | ||||
|                     _ => { | ||||
|                         drop(iter); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|         }, | ||||
|     } | ||||
|     merger_iter_into_lmdb_database( | ||||
|         wtxn, | ||||
|         database, | ||||
|         merger.into_merge_iter()?, | ||||
|         merge, | ||||
|         method, | ||||
|     )?; | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| @@ -146,7 +126,8 @@ pub fn write_into_lmdb_database( | ||||
|     mut reader: Reader<FileFuse>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
| ) -> anyhow::Result<()> | ||||
| { | ||||
|     debug!("Writing MTBL stores..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
| @@ -181,6 +162,67 @@ pub fn write_into_lmdb_database( | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn sorter_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     sorter: Sorter<MergeFn>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> | ||||
| { | ||||
|     debug!("Writing MTBL sorter..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     merger_iter_into_lmdb_database( | ||||
|         wtxn, | ||||
|         database, | ||||
|         sorter.into_iter()?, | ||||
|         merge, | ||||
|         method, | ||||
|     )?; | ||||
|  | ||||
|     debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn merger_iter_into_lmdb_database<R: io::Read>( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     mut sorter: MergerIter<R, MergeFn>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> | ||||
| { | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = sorter.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| { | ||||
|                     format!("writing {:?} into LMDB", k.as_bstr()) | ||||
|                 })?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = sorter.next()? { | ||||
|                 let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; | ||||
|                 match iter.next().transpose()? { | ||||
|                     Some((key, old_val)) if key == k => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         iter.put_current(k, &val)?; | ||||
|                     }, | ||||
|                     _ => { | ||||
|                         drop(iter); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?; | ||||
|                     }, | ||||
|                 } | ||||
|             } | ||||
|         }, | ||||
|     } | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] | ||||
| #[non_exhaustive] | ||||
| pub enum IndexDocumentsMethod { | ||||
|   | ||||
| @@ -9,7 +9,7 @@ use heed::types::ByteSlice; | ||||
|  | ||||
| use crate::heed_codec::StrStrU8Codec; | ||||
| use crate::update::index_documents::WriteMethod; | ||||
| use crate::update::index_documents::{create_sorter, create_writer, writer_into_reader, write_into_lmdb_database}; | ||||
| use crate::update::index_documents::{create_sorter, sorter_into_lmdb_database}; | ||||
| use crate::update::index_documents::{word_docids_merge, words_pairs_proximities_docids_merge}; | ||||
| use crate::{Index, SmallString32}; | ||||
|  | ||||
| @@ -144,21 +144,11 @@ impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { | ||||
|         // Set the words prefixes FST in the dtabase. | ||||
|         self.index.put_words_prefixes_fst(self.wtxn, &prefix_fst)?; | ||||
|  | ||||
|         // We write the sorter into a reader to be able to read it back. | ||||
|         let mut prefix_docids_writer = tempfile::tempfile().and_then(|file| { | ||||
|             create_writer(self.chunk_compression_type, self.chunk_compression_level, file) | ||||
|         })?; | ||||
|         prefix_docids_sorter.write_into(&mut prefix_docids_writer)?; | ||||
|         let prefix_docids_reader = writer_into_reader( | ||||
|             prefix_docids_writer, | ||||
|             self.chunk_fusing_shrink_size, | ||||
|         )?; | ||||
|  | ||||
|         // We finally write the word prefix docids into the LMDB database. | ||||
|         write_into_lmdb_database( | ||||
|         sorter_into_lmdb_database( | ||||
|             self.wtxn, | ||||
|             *self.index.word_prefix_docids.as_polymorph(), | ||||
|             prefix_docids_reader, | ||||
|             prefix_docids_sorter, | ||||
|             word_docids_merge, | ||||
|             WriteMethod::Append, | ||||
|         )?; | ||||
| @@ -190,22 +180,11 @@ impl<'t, 'u, 'i> WordsPrefixes<'t, 'u, 'i> { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // FIXME we should create a sorter_into_lmdb_database function | ||||
|         // We write the sorter into a reader to be able to read it back. | ||||
|         let mut word_prefix_pair_prox_docids_writer = tempfile::tempfile().and_then(|file| { | ||||
|             create_writer(self.chunk_compression_type, self.chunk_compression_level, file) | ||||
|         })?; | ||||
|         word_prefix_pair_proximity_docids_sorter.write_into(&mut word_prefix_pair_prox_docids_writer)?; | ||||
|         let word_prefix_pair_docids_reader = writer_into_reader( | ||||
|             word_prefix_pair_prox_docids_writer, | ||||
|             self.chunk_fusing_shrink_size, | ||||
|         )?; | ||||
|  | ||||
|         // We finally write the word prefix pair proximity docids into the LMDB database. | ||||
|         write_into_lmdb_database( | ||||
|         sorter_into_lmdb_database( | ||||
|             self.wtxn, | ||||
|             *self.index.word_prefix_pair_proximity_docids.as_polymorph(), | ||||
|             word_prefix_pair_docids_reader, | ||||
|             word_prefix_pair_proximity_docids_sorter, | ||||
|             words_pairs_proximities_docids_merge, | ||||
|             WriteMethod::Append, | ||||
|         )?; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user