mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 13:06:27 +00:00 
			
		
		
		
	Introduce the UpdateBuilder and use it in the HTTP routes
This commit is contained in:
		
				
					committed by
					
						 Kerollmops
						Kerollmops
					
				
			
			
				
	
			
			
			
						parent
						
							5c62fbb6a8
						
					
				
				
					commit
					3889d956d9
				
			| @@ -1,410 +0,0 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::fs::File; | ||||
| use std::io::{self, Seek, SeekFrom}; | ||||
| use std::sync::mpsc::sync_channel; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use bstr::ByteSlice as _; | ||||
| use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; | ||||
| use heed::types::ByteSlice; | ||||
| use log::{debug, info, error}; | ||||
| use rayon::prelude::*; | ||||
| use roaring::RoaringBitmap; | ||||
| use structopt::StructOpt; | ||||
| use tempfile::tempfile; | ||||
|  | ||||
| use crate::FieldsIdsMap; | ||||
| use crate::index::Index; | ||||
| use self::store::Store; | ||||
| use self::merge_function::{ | ||||
|     main_merge, word_docids_merge, words_pairs_proximities_docids_merge, | ||||
|     docid_word_positions_merge, documents_merge, | ||||
| }; | ||||
|  | ||||
| pub use self::transform::{Transform, TransformOutput}; | ||||
|  | ||||
| mod merge_function; | ||||
| mod store; | ||||
| mod transform; | ||||
|  | ||||
| #[derive(Debug, Clone, StructOpt)] | ||||
| pub struct IndexerOpt { | ||||
|     /// The amount of documents to skip before printing | ||||
|     /// a log regarding the indexing advancement. | ||||
|     #[structopt(long, default_value = "1000000")] // 1m | ||||
|     pub log_every_n: usize, | ||||
|  | ||||
|     /// MTBL max number of chunks in bytes. | ||||
|     #[structopt(long)] | ||||
|     pub max_nb_chunks: Option<usize>, | ||||
|  | ||||
|     /// The maximum amount of memory to use for the MTBL buffer. It is recommended | ||||
|     /// to use something like 80%-90% of the available memory. | ||||
|     /// | ||||
|     /// It is automatically split by the number of jobs e.g. if you use 7 jobs | ||||
|     /// and 7 GB of max memory, each thread will use a maximum of 1 GB. | ||||
|     #[structopt(long, default_value = "7516192768")] // 7 GB | ||||
|     pub max_memory: usize, | ||||
|  | ||||
|     /// Size of the linked hash map cache when indexing. | ||||
|     /// The bigger it is, the faster the indexing is but the more memory it takes. | ||||
|     #[structopt(long, default_value = "500")] | ||||
|     pub linked_hash_map_size: usize, | ||||
|  | ||||
|     /// The name of the compression algorithm to use when compressing intermediate | ||||
|     /// chunks during indexing documents. | ||||
|     /// | ||||
|     /// Choosing a fast algorithm will make the indexing faster but may consume more memory. | ||||
|     #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] | ||||
|     pub chunk_compression_type: CompressionType, | ||||
|  | ||||
|     /// The level of compression of the chosen algorithm. | ||||
|     #[structopt(long, requires = "chunk-compression-type")] | ||||
|     pub chunk_compression_level: Option<u32>, | ||||
|  | ||||
|     /// The number of bytes to remove from the begining of the chunks while reading/sorting | ||||
|     /// or merging them. | ||||
|     /// | ||||
|     /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, | ||||
|     /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. | ||||
|     #[structopt(long, default_value = "4294967296")] // 4 GB | ||||
|     pub chunk_fusing_shrink_size: u64, | ||||
|  | ||||
|     /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. | ||||
|     #[structopt(long)] | ||||
|     pub enable_chunk_fusing: bool, | ||||
|  | ||||
|     /// Number of parallel jobs for indexing, defaults to # of CPUs. | ||||
|     #[structopt(long)] | ||||
|     pub indexing_jobs: Option<usize>, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Copy, Clone)] | ||||
| enum WriteMethod { | ||||
|     Append, | ||||
|     GetMergePut, | ||||
| } | ||||
|  | ||||
| type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>; | ||||
|  | ||||
| fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> { | ||||
|     let mut builder = Writer::builder(); | ||||
|     builder.compression_type(typ); | ||||
|     if let Some(level) = level { | ||||
|         builder.compression_level(level); | ||||
|     } | ||||
|     builder.build(file) | ||||
| } | ||||
|  | ||||
| fn create_sorter( | ||||
|     merge: MergeFn, | ||||
|     chunk_compression_type: CompressionType, | ||||
|     chunk_compression_level: Option<u32>, | ||||
|     chunk_fusing_shrink_size: Option<u64>, | ||||
|     max_nb_chunks: Option<usize>, | ||||
|     max_memory: Option<usize>, | ||||
| ) -> Sorter<MergeFn> | ||||
| { | ||||
|     let mut builder = Sorter::builder(merge); | ||||
|     if let Some(shrink_size) = chunk_fusing_shrink_size { | ||||
|         builder.file_fusing_shrink_size(shrink_size); | ||||
|     } | ||||
|     builder.chunk_compression_type(chunk_compression_type); | ||||
|     if let Some(level) = chunk_compression_level { | ||||
|         builder.chunk_compression_level(level); | ||||
|     } | ||||
|     if let Some(nb_chunks) = max_nb_chunks { | ||||
|         builder.max_nb_chunks(nb_chunks); | ||||
|     } | ||||
|     if let Some(memory) = max_memory { | ||||
|         builder.max_memory(memory); | ||||
|     } | ||||
|     builder.build() | ||||
| } | ||||
|  | ||||
| fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> { | ||||
|     let mut file = writer.into_inner()?; | ||||
|     file.seek(SeekFrom::Start(0))?; | ||||
|     let file = if let Some(shrink_size) = shrink_size { | ||||
|         FileFuse::builder().shrink_size(shrink_size).build(file) | ||||
|     } else { | ||||
|         FileFuse::new(file) | ||||
|     }; | ||||
|     Reader::new(file).map_err(Into::into) | ||||
| } | ||||
|  | ||||
| fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> { | ||||
|     let mut builder = Merger::builder(merge); | ||||
|     builder.extend(sources); | ||||
|     builder.build() | ||||
| } | ||||
|  | ||||
| fn merge_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     sources: Vec<Reader<FileFuse>>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
|     debug!("Merging {} MTBL stores...", sources.len()); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     let merger = merge_readers(sources, merge); | ||||
|     let mut in_iter = merger.into_merge_iter()?; | ||||
|  | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { | ||||
|                     Some(old_val) => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? | ||||
|                     }, | ||||
|                     None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, | ||||
|                 } | ||||
|             } | ||||
|         }, | ||||
|     } | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn write_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     mut reader: Reader<FileFuse>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
|     debug!("Writing MTBL stores..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = reader.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = reader.next()? { | ||||
|                 match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { | ||||
|                     Some(old_val) => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? | ||||
|                     }, | ||||
|                     None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| pub fn run<F>( | ||||
|     env: &heed::Env, | ||||
|     index: &Index, | ||||
|     opt: &IndexerOpt, | ||||
|     fields_ids_map: FieldsIdsMap, | ||||
|     users_ids_documents_ids: fst::Map<Vec<u8>>, | ||||
|     new_documents_ids: RoaringBitmap, | ||||
|     documents: grenad::Reader<&[u8]>, | ||||
|     documents_count: u32, | ||||
|     progress_callback: F, | ||||
| ) -> anyhow::Result<()> | ||||
| where F: Fn(u32, u32) + Sync + Send, | ||||
| { | ||||
|     let jobs = opt.indexing_jobs.unwrap_or(0); | ||||
|     let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; | ||||
|     pool.install(|| { | ||||
|         run_intern( | ||||
|             env, | ||||
|             index, | ||||
|             opt, | ||||
|             fields_ids_map, | ||||
|             users_ids_documents_ids, | ||||
|             new_documents_ids, | ||||
|             documents, | ||||
|             documents_count, | ||||
|             progress_callback, | ||||
|         ) | ||||
|     }) | ||||
| } | ||||
|  | ||||
| fn run_intern<F>( | ||||
|     env: &heed::Env, | ||||
|     index: &Index, | ||||
|     opt: &IndexerOpt, | ||||
|     fields_ids_map: FieldsIdsMap, | ||||
|     users_ids_documents_ids: fst::Map<Vec<u8>>, | ||||
|     new_documents_ids: RoaringBitmap, | ||||
|     documents: grenad::Reader<&[u8]>, | ||||
|     documents_count: u32, | ||||
|     progress_callback: F, | ||||
| ) -> anyhow::Result<()> | ||||
| where F: Fn(u32, u32) + Sync + Send, | ||||
| { | ||||
|     let before_indexing = Instant::now(); | ||||
|     let num_threads = rayon::current_num_threads(); | ||||
|     let linked_hash_map_size = opt.linked_hash_map_size; | ||||
|     let max_nb_chunks = opt.max_nb_chunks; | ||||
|     let max_memory_by_job = opt.max_memory / num_threads; | ||||
|     let chunk_compression_type = opt.chunk_compression_type; | ||||
|     let chunk_compression_level = opt.chunk_compression_level; | ||||
|     let log_every_n = opt.log_every_n; | ||||
|  | ||||
|     let chunk_fusing_shrink_size = if opt.enable_chunk_fusing { | ||||
|         Some(opt.chunk_fusing_shrink_size) | ||||
|     } else { | ||||
|         None | ||||
|     }; | ||||
|  | ||||
|     let readers = rayon::iter::repeatn(documents, num_threads) | ||||
|         .enumerate() | ||||
|         .map(|(i, documents)| { | ||||
|             let store = Store::new( | ||||
|                 linked_hash_map_size, | ||||
|                 max_nb_chunks, | ||||
|                 Some(max_memory_by_job), | ||||
|                 chunk_compression_type, | ||||
|                 chunk_compression_level, | ||||
|                 chunk_fusing_shrink_size, | ||||
|             )?; | ||||
|             store.index(documents, documents_count, i, num_threads, log_every_n, &progress_callback) | ||||
|         }) | ||||
|         .collect::<Result<Vec<_>, _>>()?; | ||||
|  | ||||
|     let mut main_readers = Vec::with_capacity(readers.len()); | ||||
|     let mut word_docids_readers = Vec::with_capacity(readers.len()); | ||||
|     let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); | ||||
|     let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); | ||||
|     let mut documents_readers = Vec::with_capacity(readers.len()); | ||||
|     readers.into_iter().for_each(|readers| { | ||||
|         main_readers.push(readers.main); | ||||
|         word_docids_readers.push(readers.word_docids); | ||||
|         docid_word_positions_readers.push(readers.docid_word_positions); | ||||
|         words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); | ||||
|         documents_readers.push(readers.documents); | ||||
|     }); | ||||
|  | ||||
|     // This is the function that merge the readers | ||||
|     // by using the given merge function. | ||||
|     let merge_readers = move |readers, merge| { | ||||
|         let mut writer = tempfile().and_then(|f| { | ||||
|             create_writer(chunk_compression_type, chunk_compression_level, f) | ||||
|         })?; | ||||
|         let merger = merge_readers(readers, merge); | ||||
|         merger.write_into(&mut writer)?; | ||||
|         writer_into_reader(writer, chunk_fusing_shrink_size) | ||||
|     }; | ||||
|  | ||||
|     // The enum and the channel which is used to transfert | ||||
|     // the readers merges potentially done on another thread. | ||||
|     enum DatabaseType { Main, WordDocids, WordsPairsProximitiesDocids }; | ||||
|     let (sender, receiver) = sync_channel(3); | ||||
|  | ||||
|     debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); | ||||
|     rayon::spawn(move || { | ||||
|         vec![ | ||||
|             (DatabaseType::Main, main_readers, main_merge as MergeFn), | ||||
|             (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), | ||||
|             ( | ||||
|                 DatabaseType::WordsPairsProximitiesDocids, | ||||
|                 words_pairs_proximities_docids_readers, | ||||
|                 words_pairs_proximities_docids_merge, | ||||
|             ), | ||||
|         ] | ||||
|         .into_par_iter() | ||||
|         .for_each(|(dbtype, readers, merge)| { | ||||
|             let result = merge_readers(readers, merge); | ||||
|             if let Err(e) = sender.send((dbtype, result)) { | ||||
|                 error!("sender error: {}", e); | ||||
|             } | ||||
|         }); | ||||
|     }); | ||||
|  | ||||
|     // We create the write transaction of this update. | ||||
|     // TODO we must get this transaction as an argument to be able | ||||
|     //      to first delete the replaced documents for example. | ||||
|     let mut wtxn = env.write_txn()?; | ||||
|  | ||||
|     let mut documents_ids = index.documents_ids(&wtxn)?; | ||||
|     let contains_documents = !documents_ids.is_empty(); | ||||
|     let write_method = if contains_documents { | ||||
|         WriteMethod::GetMergePut | ||||
|     } else { | ||||
|         WriteMethod::Append | ||||
|     }; | ||||
|  | ||||
|     // We write the fields ids map into the main database | ||||
|     index.put_fields_ids_map(&mut wtxn, &fields_ids_map)?; | ||||
|  | ||||
|     // We write the users_ids_documents_ids into the main database. | ||||
|     index.put_users_ids_documents_ids(&mut wtxn, &users_ids_documents_ids)?; | ||||
|  | ||||
|     // We merge the new documents ids with the existing ones. | ||||
|     documents_ids.union_with(&new_documents_ids); | ||||
|     index.put_documents_ids(&mut wtxn, &documents_ids)?; | ||||
|  | ||||
|     debug!("Writing the docid word positions into LMDB on disk..."); | ||||
|     merge_into_lmdb_database( | ||||
|         &mut wtxn, | ||||
|         *index.docid_word_positions.as_polymorph(), | ||||
|         docid_word_positions_readers, | ||||
|         docid_word_positions_merge, | ||||
|         write_method | ||||
|     )?; | ||||
|  | ||||
|     debug!("Writing the documents into LMDB on disk..."); | ||||
|     merge_into_lmdb_database( | ||||
|         &mut wtxn, | ||||
|         *index.documents.as_polymorph(), | ||||
|         documents_readers, | ||||
|         documents_merge, | ||||
|         write_method | ||||
|     )?; | ||||
|  | ||||
|     for (db_type, result) in receiver { | ||||
|         let content = result?; | ||||
|         match db_type { | ||||
|             DatabaseType::Main => { | ||||
|                 debug!("Writing the main elements into LMDB on disk..."); | ||||
|                 write_into_lmdb_database(&mut wtxn, index.main, content, main_merge, write_method)?; | ||||
|             }, | ||||
|             DatabaseType::WordDocids => { | ||||
|                 debug!("Writing the words docids into LMDB on disk..."); | ||||
|                 let db = *index.word_docids.as_polymorph(); | ||||
|                 write_into_lmdb_database(&mut wtxn, db, content, word_docids_merge, write_method)?; | ||||
|             }, | ||||
|             DatabaseType::WordsPairsProximitiesDocids => { | ||||
|                 debug!("Writing the words pairs proximities docids into LMDB on disk..."); | ||||
|                 let db = *index.word_pair_proximity_docids.as_polymorph(); | ||||
|                 write_into_lmdb_database( | ||||
|                     &mut wtxn, | ||||
|                     db, | ||||
|                     content, | ||||
|                     words_pairs_proximities_docids_merge, | ||||
|                     write_method, | ||||
|                 )?; | ||||
|             }, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     wtxn.commit()?; | ||||
|  | ||||
|     info!("Update processed in {:.02?}", before_indexing.elapsed()); | ||||
|  | ||||
|     Ok(()) | ||||
| } | ||||
| @@ -1,7 +1,6 @@ | ||||
| mod criterion; | ||||
| mod fields_ids_map; | ||||
| mod index; | ||||
| mod indexing; | ||||
| mod mdfs; | ||||
| mod query_tokens; | ||||
| mod search; | ||||
|   | ||||
| @@ -11,6 +11,7 @@ use askama_warp::Template; | ||||
| use flate2::read::GzDecoder; | ||||
| use futures::stream; | ||||
| use futures::{FutureExt, StreamExt}; | ||||
| use grenad::CompressionType; | ||||
| use heed::EnvOpenOptions; | ||||
| use indexmap::IndexMap; | ||||
| use serde::{Serialize, Deserialize}; | ||||
| @@ -21,9 +22,8 @@ use tokio::sync::broadcast; | ||||
| use warp::filters::ws::Message; | ||||
| use warp::{Filter, http::Response}; | ||||
|  | ||||
| use crate::indexing::{self, IndexerOpt, Transform, TransformOutput}; | ||||
| use crate::tokenizer::{simple_tokenizer, TokenType}; | ||||
| use crate::update::AvailableDocumentsIds; | ||||
| use crate::update::{UpdateBuilder, IndexDocumentsMethod}; | ||||
| use crate::{Index, UpdateStore, SearchResult}; | ||||
|  | ||||
| #[derive(Debug, StructOpt)] | ||||
| @@ -60,6 +60,58 @@ pub struct Opt { | ||||
|     indexer: IndexerOpt, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, StructOpt)] | ||||
| pub struct IndexerOpt { | ||||
|     /// The amount of documents to skip before printing | ||||
|     /// a log regarding the indexing advancement. | ||||
|     #[structopt(long, default_value = "1000000")] // 1m | ||||
|     pub log_every_n: usize, | ||||
|  | ||||
|     /// MTBL max number of chunks in bytes. | ||||
|     #[structopt(long)] | ||||
|     pub max_nb_chunks: Option<usize>, | ||||
|  | ||||
|     /// The maximum amount of memory to use for the MTBL buffer. It is recommended | ||||
|     /// to use something like 80%-90% of the available memory. | ||||
|     /// | ||||
|     /// It is automatically split by the number of jobs e.g. if you use 7 jobs | ||||
|     /// and 7 GB of max memory, each thread will use a maximum of 1 GB. | ||||
|     #[structopt(long, default_value = "7516192768")] // 7 GB | ||||
|     pub max_memory: usize, | ||||
|  | ||||
|     /// Size of the linked hash map cache when indexing. | ||||
|     /// The bigger it is, the faster the indexing is but the more memory it takes. | ||||
|     #[structopt(long, default_value = "500")] | ||||
|     pub linked_hash_map_size: usize, | ||||
|  | ||||
|     /// The name of the compression algorithm to use when compressing intermediate | ||||
|     /// chunks during indexing documents. | ||||
|     /// | ||||
|     /// Choosing a fast algorithm will make the indexing faster but may consume more memory. | ||||
|     #[structopt(long, default_value = "snappy", possible_values = &["snappy", "zlib", "lz4", "lz4hc", "zstd"])] | ||||
|     pub chunk_compression_type: CompressionType, | ||||
|  | ||||
|     /// The level of compression of the chosen algorithm. | ||||
|     #[structopt(long, requires = "chunk-compression-type")] | ||||
|     pub chunk_compression_level: Option<u32>, | ||||
|  | ||||
|     /// The number of bytes to remove from the begining of the chunks while reading/sorting | ||||
|     /// or merging them. | ||||
|     /// | ||||
|     /// File fusing must only be enable on file systems that support the `FALLOC_FL_COLLAPSE_RANGE`, | ||||
|     /// (i.e. ext4 and XFS). File fusing will only work if the `enable-chunk-fusing` is set. | ||||
|     #[structopt(long, default_value = "4294967296")] // 4 GB | ||||
|     pub chunk_fusing_shrink_size: u64, | ||||
|  | ||||
|     /// Enable the chunk fusing or not, this reduces the amount of disk used by a factor of 2. | ||||
|     #[structopt(long)] | ||||
|     pub enable_chunk_fusing: bool, | ||||
|  | ||||
|     /// Number of parallel jobs for indexing, defaults to # of CPUs. | ||||
|     #[structopt(long)] | ||||
|     pub indexing_jobs: Option<usize>, | ||||
| } | ||||
|  | ||||
| fn highlight_record(record: &mut IndexMap<String, String>, words: &HashSet<String>) { | ||||
|     for (_key, value) in record.iter_mut() { | ||||
|         let old_value = mem::take(value); | ||||
| @@ -152,25 +204,36 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | ||||
|         update_store_options, | ||||
|         update_store_path, | ||||
|         move |update_id, meta, content| { | ||||
|             let result = match meta { | ||||
|             // We prepare the update by using the update builder. | ||||
|             let mut update_builder = UpdateBuilder::new(); | ||||
|             if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { | ||||
|                 update_builder.max_nb_chunks(max_nb_chunks); | ||||
|             } | ||||
|             if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { | ||||
|                 update_builder.chunk_compression_level(chunk_compression_level); | ||||
|             } | ||||
|             if let Some(indexing_jobs) = indexer_opt_cloned.indexing_jobs { | ||||
|                 update_builder.indexing_jobs(indexing_jobs); | ||||
|             } | ||||
|             update_builder.log_every_n(indexer_opt_cloned.log_every_n); | ||||
|             update_builder.max_memory(indexer_opt_cloned.max_memory); | ||||
|             update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size); | ||||
|             update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type); | ||||
|             update_builder.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size); | ||||
|  | ||||
|             // we extract the update type and execute the update itself. | ||||
|             let result: anyhow::Result<()> = match meta { | ||||
|                 UpdateMeta::DocumentsAddition => { | ||||
|                     // We must use the write transaction of the update here. | ||||
|                     let rtxn = env_cloned.read_txn()?; | ||||
|                     let fields_ids_map = index_cloned.fields_ids_map(&rtxn)?; | ||||
|                     let documents_ids = index_cloned.documents_ids(&rtxn)?; | ||||
|                     let available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); | ||||
|                     let users_ids_documents_ids = index_cloned.users_ids_documents_ids(&rtxn).unwrap(); | ||||
|                     let mut wtxn = env_cloned.write_txn()?; | ||||
|                     let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); | ||||
|  | ||||
|                     let transform = Transform { | ||||
|                         fields_ids_map, | ||||
|                         available_documents_ids, | ||||
|                         users_ids_documents_ids, | ||||
|                         chunk_compression_type: indexer_opt_cloned.chunk_compression_type, | ||||
|                         chunk_compression_level: indexer_opt_cloned.chunk_compression_level, | ||||
|                         chunk_fusing_shrink_size: Some(indexer_opt_cloned.chunk_fusing_shrink_size), | ||||
|                         max_nb_chunks: indexer_opt_cloned.max_nb_chunks, | ||||
|                         max_memory: Some(indexer_opt_cloned.max_memory), | ||||
|                     }; | ||||
|                     let replace_documents = true; | ||||
|                     if replace_documents { | ||||
|                         builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); | ||||
|                     } else { | ||||
|                         builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); | ||||
|                     } | ||||
|  | ||||
|                     let gzipped = false; | ||||
|                     let reader = if gzipped { | ||||
| @@ -179,41 +242,22 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | ||||
|                         Box::new(content) as Box<dyn io::Read> | ||||
|                     }; | ||||
|  | ||||
|                     let TransformOutput { | ||||
|                         fields_ids_map, | ||||
|                         users_ids_documents_ids, | ||||
|                         new_documents_ids, | ||||
|                         replaced_documents_ids, | ||||
|                         documents_count, | ||||
|                         documents_file, | ||||
|                     } = transform.from_csv(reader).unwrap(); | ||||
|                     let result = builder.execute(reader, |count, total| { | ||||
|                         let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { | ||||
|                             update_id, | ||||
|                             meta: UpdateMetaProgress::DocumentsAddition { | ||||
|                                 processed_number_of_documents: count, | ||||
|                                 total_number_of_documents: Some(total), | ||||
|                             } | ||||
|                         }); | ||||
|                     }); | ||||
|  | ||||
|                     drop(rtxn); | ||||
|  | ||||
|                     let mmap = unsafe { memmap::Mmap::map(&documents_file)? }; | ||||
|                     let documents = grenad::Reader::new(mmap.as_ref()).unwrap(); | ||||
|  | ||||
|                     indexing::run( | ||||
|                         &env_cloned, | ||||
|                         &index_cloned, | ||||
|                         &indexer_opt_cloned, | ||||
|                         fields_ids_map, | ||||
|                         users_ids_documents_ids, | ||||
|                         new_documents_ids, | ||||
|                         documents, | ||||
|                         documents_count as u32, | ||||
|                         |count, total| { | ||||
|                             // We send progress status... | ||||
|                             let meta = UpdateMetaProgress::DocumentsAddition { | ||||
|                                 processed_number_of_documents: count as usize, | ||||
|                                 total_number_of_documents: Some(total as usize), | ||||
|                             }; | ||||
|                             let progress = UpdateStatus::Progressing { update_id, meta }; | ||||
|                             let _ = update_status_sender_cloned.send(progress); | ||||
|                         }, | ||||
|                     ) | ||||
|                     match result { | ||||
|                         Ok(()) => wtxn.commit().map_err(Into::into), | ||||
|                         Err(e) => Err(e.into()) | ||||
|                     } | ||||
|                 }, | ||||
|                 UpdateMeta::DocumentsAdditionFromPath { path } => { | ||||
|                 UpdateMeta::DocumentsAdditionFromPath { path: _ } => { | ||||
|                     todo!() | ||||
|                 } | ||||
|             }; | ||||
|   | ||||
| @@ -69,3 +69,19 @@ pub fn words_pairs_proximities_docids_merge(_key: &[u8], values: &[Cow<[u8]>]) - | ||||
| pub fn documents_merge(key: &[u8], _values: &[Cow<[u8]>]) -> anyhow::Result<Vec<u8>> { | ||||
|     bail!("merging documents is an error ({:?})", key.as_bstr()) | ||||
| } | ||||
| 
 | ||||
| pub fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec<u8>) { | ||||
|     use itertools::merge_join_by; | ||||
|     use itertools::EitherOrBoth::{Both, Left, Right}; | ||||
| 
 | ||||
|     buffer.clear(); | ||||
| 
 | ||||
|     let mut writer = obkv::KvWriter::new(buffer); | ||||
|     for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { | ||||
|         match eob { | ||||
|             Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     writer.finish().unwrap(); | ||||
| } | ||||
| @@ -1,4 +1,161 @@ | ||||
| use crate::Index; | ||||
| use std::borrow::Cow; | ||||
| use std::fs::File; | ||||
| use std::io::{self, Seek, SeekFrom}; | ||||
| use std::sync::mpsc::sync_channel; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use bstr::ByteSlice as _; | ||||
| use grenad::{Writer, Sorter, Merger, Reader, FileFuse, CompressionType}; | ||||
| use heed::types::ByteSlice; | ||||
| use log::{debug, info, error}; | ||||
| use rayon::prelude::*; | ||||
| use crate::index::Index; | ||||
| use self::store::Store; | ||||
| use self::merge_function::{ | ||||
|     main_merge, word_docids_merge, words_pairs_proximities_docids_merge, | ||||
|     docid_word_positions_merge, documents_merge, | ||||
| }; | ||||
| pub use self::transform::{Transform, TransformOutput}; | ||||
|  | ||||
| use super::UpdateBuilder; | ||||
|  | ||||
| mod merge_function; | ||||
| mod store; | ||||
| mod transform; | ||||
|  | ||||
| #[derive(Debug, Copy, Clone)] | ||||
| enum WriteMethod { | ||||
|     Append, | ||||
|     GetMergePut, | ||||
| } | ||||
|  | ||||
| type MergeFn = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> anyhow::Result<Vec<u8>>; | ||||
|  | ||||
| fn create_writer(typ: CompressionType, level: Option<u32>, file: File) -> io::Result<Writer<File>> { | ||||
|     let mut builder = Writer::builder(); | ||||
|     builder.compression_type(typ); | ||||
|     if let Some(level) = level { | ||||
|         builder.compression_level(level); | ||||
|     } | ||||
|     builder.build(file) | ||||
| } | ||||
|  | ||||
| fn create_sorter( | ||||
|     merge: MergeFn, | ||||
|     chunk_compression_type: CompressionType, | ||||
|     chunk_compression_level: Option<u32>, | ||||
|     chunk_fusing_shrink_size: Option<u64>, | ||||
|     max_nb_chunks: Option<usize>, | ||||
|     max_memory: Option<usize>, | ||||
| ) -> Sorter<MergeFn> | ||||
| { | ||||
|     let mut builder = Sorter::builder(merge); | ||||
|     if let Some(shrink_size) = chunk_fusing_shrink_size { | ||||
|         builder.file_fusing_shrink_size(shrink_size); | ||||
|     } | ||||
|     builder.chunk_compression_type(chunk_compression_type); | ||||
|     if let Some(level) = chunk_compression_level { | ||||
|         builder.chunk_compression_level(level); | ||||
|     } | ||||
|     if let Some(nb_chunks) = max_nb_chunks { | ||||
|         builder.max_nb_chunks(nb_chunks); | ||||
|     } | ||||
|     if let Some(memory) = max_memory { | ||||
|         builder.max_memory(memory); | ||||
|     } | ||||
|     builder.build() | ||||
| } | ||||
|  | ||||
| fn writer_into_reader(writer: Writer<File>, shrink_size: Option<u64>) -> anyhow::Result<Reader<FileFuse>> { | ||||
|     let mut file = writer.into_inner()?; | ||||
|     file.seek(SeekFrom::Start(0))?; | ||||
|     let file = if let Some(shrink_size) = shrink_size { | ||||
|         FileFuse::builder().shrink_size(shrink_size).build(file) | ||||
|     } else { | ||||
|         FileFuse::new(file) | ||||
|     }; | ||||
|     Reader::new(file).map_err(Into::into) | ||||
| } | ||||
|  | ||||
| fn merge_readers(sources: Vec<Reader<FileFuse>>, merge: MergeFn) -> Merger<FileFuse, MergeFn> { | ||||
|     let mut builder = Merger::builder(merge); | ||||
|     builder.extend(sources); | ||||
|     builder.build() | ||||
| } | ||||
|  | ||||
| fn merge_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     sources: Vec<Reader<FileFuse>>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
|     debug!("Merging {} MTBL stores...", sources.len()); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     let merger = merge_readers(sources, merge); | ||||
|     let mut in_iter = merger.into_merge_iter()?; | ||||
|  | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = in_iter.next()? { | ||||
|                 match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { | ||||
|                     Some(old_val) => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? | ||||
|                     }, | ||||
|                     None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, | ||||
|                 } | ||||
|             } | ||||
|         }, | ||||
|     } | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn write_into_lmdb_database( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     database: heed::PolyDatabase, | ||||
|     mut reader: Reader<FileFuse>, | ||||
|     merge: MergeFn, | ||||
|     method: WriteMethod, | ||||
| ) -> anyhow::Result<()> { | ||||
|     debug!("Writing MTBL stores..."); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     match method { | ||||
|         WriteMethod::Append => { | ||||
|             let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?; | ||||
|             while let Some((k, v)) = reader.next()? { | ||||
|                 out_iter.append(k, v).with_context(|| format!("writing {:?} into LMDB", k.as_bstr()))?; | ||||
|             } | ||||
|         }, | ||||
|         WriteMethod::GetMergePut => { | ||||
|             while let Some((k, v)) = reader.next()? { | ||||
|                 match database.get::<_, ByteSlice, ByteSlice>(wtxn, k)? { | ||||
|                     Some(old_val) => { | ||||
|                         let vals = vec![Cow::Borrowed(old_val), Cow::Borrowed(v)]; | ||||
|                         let val = merge(k, &vals).expect("merge failed"); | ||||
|                         database.put::<_, ByteSlice, ByteSlice>(wtxn, k, &val)? | ||||
|                     }, | ||||
|                     None => database.put::<_, ByteSlice, ByteSlice>(wtxn, k, v)?, | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] | ||||
| pub enum IndexDocumentsMethod { | ||||
| @@ -14,12 +171,72 @@ pub enum IndexDocumentsMethod { | ||||
| pub struct IndexDocuments<'t, 'u, 'i> { | ||||
|     wtxn: &'t mut heed::RwTxn<'u>, | ||||
|     index: &'i Index, | ||||
|     log_every_n: Option<usize>, | ||||
|     max_nb_chunks: Option<usize>, | ||||
|     max_memory: Option<usize>, | ||||
|     linked_hash_map_size: Option<usize>, | ||||
|     chunk_compression_type: CompressionType, | ||||
|     chunk_compression_level: Option<u32>, | ||||
|     chunk_fusing_shrink_size: Option<u64>, | ||||
|     indexing_jobs: Option<usize>, | ||||
|     update_method: IndexDocumentsMethod, | ||||
| } | ||||
|  | ||||
| impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { | ||||
|     pub fn new(wtxn: &'t mut heed::RwTxn<'u>, index: &'i Index) -> IndexDocuments<'t, 'u, 'i> { | ||||
|         IndexDocuments { wtxn, index, update_method: IndexDocumentsMethod::ReplaceDocuments } | ||||
|         IndexDocuments { | ||||
|             wtxn, | ||||
|             index, | ||||
|             log_every_n: None, | ||||
|             max_nb_chunks: None, | ||||
|             max_memory: None, | ||||
|             linked_hash_map_size: None, | ||||
|             chunk_compression_type: CompressionType::None, | ||||
|             chunk_compression_level: None, | ||||
|             chunk_fusing_shrink_size: None, | ||||
|             indexing_jobs: None, | ||||
|             update_method: IndexDocumentsMethod::ReplaceDocuments | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { | ||||
|         self.log_every_n = Some(log_every_n); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn max_nb_chunks(&mut self, max_nb_chunks: usize) -> &mut Self { | ||||
|         self.max_nb_chunks = Some(max_nb_chunks); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn max_memory(&mut self, max_memory: usize) -> &mut Self { | ||||
|         self.max_memory = Some(max_memory); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { | ||||
|         self.linked_hash_map_size = Some(linked_hash_map_size); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) -> &mut Self { | ||||
|         self.chunk_compression_type = chunk_compression_type; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn chunk_compression_level(&mut self, chunk_compression_level: u32) -> &mut Self { | ||||
|         self.chunk_compression_level = Some(chunk_compression_level); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { | ||||
|         self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub(crate) fn indexing_jobs(&mut self, indexing_jobs: usize) -> &mut Self { | ||||
|         self.indexing_jobs = Some(indexing_jobs); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) -> &mut Self { | ||||
| @@ -27,7 +244,228 @@ impl<'t, 'u, 'i> IndexDocuments<'t, 'u, 'i> { | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn execute(self) -> anyhow::Result<()> { | ||||
|         todo!() | ||||
|     pub fn execute<R, F>(self, reader: R, progress_callback: F) -> anyhow::Result<()> | ||||
|     where | ||||
|         R: io::Read, | ||||
|         F: Fn(usize, usize) + Sync, | ||||
|     { | ||||
|         let before_indexing = Instant::now(); | ||||
|  | ||||
|         let transform = Transform { | ||||
|             rtxn: &self.wtxn, | ||||
|             index: self.index, | ||||
|             chunk_compression_type: self.chunk_compression_type, | ||||
|             chunk_compression_level: self.chunk_compression_level, | ||||
|             chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, | ||||
|             max_nb_chunks: self.max_nb_chunks, | ||||
|             max_memory: self.max_memory, | ||||
|             index_documents_method: self.update_method, | ||||
|         }; | ||||
|  | ||||
|         let TransformOutput { | ||||
|             fields_ids_map, | ||||
|             users_ids_documents_ids, | ||||
|             new_documents_ids, | ||||
|             replaced_documents_ids, | ||||
|             documents_count, | ||||
|             documents_file, | ||||
|         } = transform.from_csv(reader)?; | ||||
|  | ||||
|         // We delete the documents that this document addition replaces. This way we are | ||||
|         // able to simply insert all the documents even if they already exist in the database. | ||||
|         if !replaced_documents_ids.is_empty() { | ||||
|             let update_builder = UpdateBuilder { | ||||
|                 log_every_n: self.log_every_n, | ||||
|                 max_nb_chunks: self.max_nb_chunks, | ||||
|                 max_memory: self.max_memory, | ||||
|                 linked_hash_map_size: self.linked_hash_map_size, | ||||
|                 chunk_compression_type: self.chunk_compression_type, | ||||
|                 chunk_compression_level: self.chunk_compression_level, | ||||
|                 chunk_fusing_shrink_size: self.chunk_fusing_shrink_size, | ||||
|                 indexing_jobs: self.indexing_jobs, | ||||
|             }; | ||||
|             let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; | ||||
|             deletion_builder.delete_documents(&replaced_documents_ids); | ||||
|             let _deleted_documents_count = deletion_builder.execute()?; | ||||
|         } | ||||
|  | ||||
|         let mmap = unsafe { | ||||
|             memmap::Mmap::map(&documents_file).context("mmaping the transform documents file")? | ||||
|         }; | ||||
|         let documents = grenad::Reader::new(mmap.as_ref())?; | ||||
|  | ||||
|         // The enum which indicates the type of the readers | ||||
|         // merges that are potentially done on different threads. | ||||
|         enum DatabaseType { | ||||
|             Main, | ||||
|             WordDocids, | ||||
|             WordsPairsProximitiesDocids, | ||||
|         } | ||||
|  | ||||
|         let linked_hash_map_size = self.linked_hash_map_size; | ||||
|         let max_nb_chunks = self.max_nb_chunks; | ||||
|         let max_memory = self.max_memory; | ||||
|         let chunk_compression_type = self.chunk_compression_type; | ||||
|         let chunk_compression_level = self.chunk_compression_level; | ||||
|         let log_every_n = self.log_every_n; | ||||
|         let chunk_fusing_shrink_size = self.chunk_fusing_shrink_size; | ||||
|  | ||||
|         let jobs = self.indexing_jobs.unwrap_or(0); | ||||
|         let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; | ||||
|  | ||||
|         let (receiver, docid_word_positions_readers, documents_readers) = pool.install(|| { | ||||
|             let num_threads = rayon::current_num_threads(); | ||||
|             let max_memory_by_job = max_memory.map(|mm| mm / num_threads); | ||||
|  | ||||
|             let readers = rayon::iter::repeatn(documents, num_threads) | ||||
|                 .enumerate() | ||||
|                 .map(|(i, documents)| { | ||||
|                     let store = Store::new( | ||||
|                         linked_hash_map_size, | ||||
|                         max_nb_chunks, | ||||
|                         max_memory_by_job, | ||||
|                         chunk_compression_type, | ||||
|                         chunk_compression_level, | ||||
|                         chunk_fusing_shrink_size, | ||||
|                     )?; | ||||
|                     store.index( | ||||
|                         documents, | ||||
|                         documents_count, | ||||
|                         i, | ||||
|                         num_threads, | ||||
|                         log_every_n, | ||||
|                         &progress_callback, | ||||
|                     ) | ||||
|                 }) | ||||
|                 .collect::<Result<Vec<_>, _>>()?; | ||||
|  | ||||
|             let mut main_readers = Vec::with_capacity(readers.len()); | ||||
|             let mut word_docids_readers = Vec::with_capacity(readers.len()); | ||||
|             let mut docid_word_positions_readers = Vec::with_capacity(readers.len()); | ||||
|             let mut words_pairs_proximities_docids_readers = Vec::with_capacity(readers.len()); | ||||
|             let mut documents_readers = Vec::with_capacity(readers.len()); | ||||
|             readers.into_iter().for_each(|readers| { | ||||
|                 main_readers.push(readers.main); | ||||
|                 word_docids_readers.push(readers.word_docids); | ||||
|                 docid_word_positions_readers.push(readers.docid_word_positions); | ||||
|                 words_pairs_proximities_docids_readers.push(readers.words_pairs_proximities_docids); | ||||
|                 documents_readers.push(readers.documents); | ||||
|             }); | ||||
|  | ||||
|             // This is the function that merge the readers | ||||
|             // by using the given merge function. | ||||
|             let merge_readers = move |readers, merge| { | ||||
|                 let mut writer = tempfile::tempfile().and_then(|f| { | ||||
|                     create_writer(chunk_compression_type, chunk_compression_level, f) | ||||
|                 })?; | ||||
|                 let merger = merge_readers(readers, merge); | ||||
|                 merger.write_into(&mut writer)?; | ||||
|                 writer_into_reader(writer, chunk_fusing_shrink_size) | ||||
|             }; | ||||
|  | ||||
|             // The enum and the channel which is used to transfert | ||||
|             // the readers merges potentially done on another thread. | ||||
|             let (sender, receiver) = sync_channel(3); | ||||
|  | ||||
|             debug!("Merging the main, word docids and words pairs proximity docids in parallel..."); | ||||
|             rayon::spawn(move || { | ||||
|                 vec![ | ||||
|                     (DatabaseType::Main, main_readers, main_merge as MergeFn), | ||||
|                     (DatabaseType::WordDocids, word_docids_readers, word_docids_merge), | ||||
|                     ( | ||||
|                         DatabaseType::WordsPairsProximitiesDocids, | ||||
|                         words_pairs_proximities_docids_readers, | ||||
|                         words_pairs_proximities_docids_merge, | ||||
|                     ), | ||||
|                 ] | ||||
|                 .into_par_iter() | ||||
|                 .for_each(|(dbtype, readers, merge)| { | ||||
|                     let result = merge_readers(readers, merge); | ||||
|                     if let Err(e) = sender.send((dbtype, result)) { | ||||
|                         error!("sender error: {}", e); | ||||
|                     } | ||||
|                 }); | ||||
|             }); | ||||
|  | ||||
|             Ok((receiver, docid_word_positions_readers, documents_readers)) as anyhow::Result<_> | ||||
|         })?; | ||||
|  | ||||
|         let mut documents_ids = self.index.documents_ids(self.wtxn)?; | ||||
|         let contains_documents = !documents_ids.is_empty(); | ||||
|         let write_method = if contains_documents { | ||||
|             WriteMethod::GetMergePut | ||||
|         } else { | ||||
|             WriteMethod::Append | ||||
|         }; | ||||
|  | ||||
|         // We write the fields ids map into the main database | ||||
|         self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; | ||||
|  | ||||
|         // We write the users_ids_documents_ids into the main database. | ||||
|         self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?; | ||||
|  | ||||
|         // We merge the new documents ids with the existing ones. | ||||
|         documents_ids.union_with(&new_documents_ids); | ||||
|         self.index.put_documents_ids(self.wtxn, &documents_ids)?; | ||||
|  | ||||
|         debug!("Writing the docid word positions into LMDB on disk..."); | ||||
|         merge_into_lmdb_database( | ||||
|             self.wtxn, | ||||
|             *self.index.docid_word_positions.as_polymorph(), | ||||
|             docid_word_positions_readers, | ||||
|             docid_word_positions_merge, | ||||
|             write_method | ||||
|         )?; | ||||
|  | ||||
|         debug!("Writing the documents into LMDB on disk..."); | ||||
|         merge_into_lmdb_database( | ||||
|             self.wtxn, | ||||
|             *self.index.documents.as_polymorph(), | ||||
|             documents_readers, | ||||
|             documents_merge, | ||||
|             write_method | ||||
|         )?; | ||||
|  | ||||
|         for (db_type, result) in receiver { | ||||
|             let content = result?; | ||||
|             match db_type { | ||||
|                 DatabaseType::Main => { | ||||
|                     debug!("Writing the main elements into LMDB on disk..."); | ||||
|                     write_into_lmdb_database( | ||||
|                         self.wtxn, | ||||
|                         self.index.main, | ||||
|                         content, | ||||
|                         main_merge, | ||||
|                         write_method, | ||||
|                     )?; | ||||
|                 }, | ||||
|                 DatabaseType::WordDocids => { | ||||
|                     debug!("Writing the words docids into LMDB on disk..."); | ||||
|                     let db = *self.index.word_docids.as_polymorph(); | ||||
|                     write_into_lmdb_database( | ||||
|                         self.wtxn, | ||||
|                         db, | ||||
|                         content, | ||||
|                         word_docids_merge, | ||||
|                         write_method, | ||||
|                     )?; | ||||
|                 }, | ||||
|                 DatabaseType::WordsPairsProximitiesDocids => { | ||||
|                     debug!("Writing the words pairs proximities docids into LMDB on disk..."); | ||||
|                     let db = *self.index.word_pair_proximity_docids.as_polymorph(); | ||||
|                     write_into_lmdb_database( | ||||
|                         self.wtxn, | ||||
|                         db, | ||||
|                         content, | ||||
|                         words_pairs_proximities_docids_merge, | ||||
|                         write_method, | ||||
|                     )?; | ||||
|                 }, | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         info!("Update processed in {:.02?}", before_indexing.elapsed()); | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -56,7 +56,7 @@ pub struct Store { | ||||
| 
 | ||||
| impl Store { | ||||
|     pub fn new( | ||||
|         linked_hash_map_size: usize, | ||||
|         linked_hash_map_size: Option<usize>, | ||||
|         max_nb_chunks: Option<usize>, | ||||
|         max_memory: Option<usize>, | ||||
|         chunk_compression_type: CompressionType, | ||||
| @@ -66,6 +66,7 @@ impl Store { | ||||
|     { | ||||
|         // We divide the max memory by the number of sorter the Store have.
 | ||||
|         let max_memory = max_memory.map(|mm| cmp::max(ONE_KILOBYTE, mm / 3)); | ||||
|         let linked_hash_map_size = linked_hash_map_size.unwrap_or(500); | ||||
| 
 | ||||
|         let main_sorter = create_sorter( | ||||
|             main_merge, | ||||
| @@ -280,13 +281,13 @@ impl Store { | ||||
|     pub fn index<F>( | ||||
|         mut self, | ||||
|         mut documents: grenad::Reader<&[u8]>, | ||||
|         documents_count: u32, | ||||
|         documents_count: usize, | ||||
|         thread_index: usize, | ||||
|         num_threads: usize, | ||||
|         log_every_n: usize, | ||||
|         log_every_n: Option<usize>, | ||||
|         mut progress_callback: F, | ||||
|     ) -> anyhow::Result<Readers> | ||||
|     where F: FnMut(u32, u32), | ||||
|     where F: FnMut(usize, usize), | ||||
|     { | ||||
|         debug!("{:?}: Indexing in a Store...", thread_index); | ||||
| 
 | ||||
| @@ -301,9 +302,9 @@ impl Store { | ||||
|             // We skip documents that must not be indexed by this thread.
 | ||||
|             if count % num_threads == thread_index { | ||||
|                 // This is a log routine that we do every `log_every_n` documents.
 | ||||
|                 if count % log_every_n == 0 { | ||||
|                 if log_every_n.map_or(false, |len| count % len == 0) { | ||||
|                     info!("We have seen {} documents so far ({:.02?}).", format_count(count), before.elapsed()); | ||||
|                     progress_callback(count as u32, documents_count); | ||||
|                     progress_callback(count, documents_count); | ||||
|                     before = Instant::now(); | ||||
|                 } | ||||
| 
 | ||||
| @@ -325,7 +326,7 @@ impl Store { | ||||
|             count = count + 1; | ||||
|         } | ||||
| 
 | ||||
|         progress_callback(count as u32, documents_count); | ||||
|         progress_callback(count, documents_count); | ||||
| 
 | ||||
|         let readers = self.finish()?; | ||||
|         debug!("{:?}: Store created!", thread_index); | ||||
| @@ -8,9 +8,10 @@ use fst::{IntoStreamer, Streamer}; | ||||
| use grenad::CompressionType; | ||||
| use roaring::RoaringBitmap; | ||||
| 
 | ||||
| use crate::FieldsIdsMap; | ||||
| use crate::{BEU32, Index, FieldsIdsMap}; | ||||
| use crate::update::AvailableDocumentsIds; | ||||
| use super::{create_writer, create_sorter}; | ||||
| use super::merge_function::merge_two_obkv; | ||||
| use super::{create_writer, create_sorter, IndexDocumentsMethod}; | ||||
| 
 | ||||
| pub struct TransformOutput { | ||||
|     pub fields_ids_map: FieldsIdsMap, | ||||
| @@ -21,41 +22,30 @@ pub struct TransformOutput { | ||||
|     pub documents_file: File, | ||||
| } | ||||
| 
 | ||||
| pub struct Transform<A> { | ||||
|     pub fields_ids_map: FieldsIdsMap, | ||||
|     pub available_documents_ids: AvailableDocumentsIds, | ||||
|     pub users_ids_documents_ids: fst::Map<A>, | ||||
| pub struct Transform<'t, 'i> { | ||||
|     pub rtxn: &'t heed::RoTxn, | ||||
|     pub index: &'i Index, | ||||
|     pub chunk_compression_type: CompressionType, | ||||
|     pub chunk_compression_level: Option<u32>, | ||||
|     pub chunk_fusing_shrink_size: Option<u64>, | ||||
|     pub max_nb_chunks: Option<usize>, | ||||
|     pub max_memory: Option<usize>, | ||||
|     pub index_documents_method: IndexDocumentsMethod, | ||||
| } | ||||
| 
 | ||||
| fn merge_two_obkv(base: obkv::KvReader, update: obkv::KvReader, buffer: &mut Vec<u8>) { | ||||
|     use itertools::merge_join_by; | ||||
|     use itertools::EitherOrBoth::{Both, Left, Right}; | ||||
| 
 | ||||
|     buffer.clear(); | ||||
| 
 | ||||
|     let mut writer = obkv::KvWriter::new(buffer); | ||||
|     for eob in merge_join_by(base.iter(), update.iter(), |(b, _), (u, _)| b.cmp(u)) { | ||||
|         match eob { | ||||
|             Both(_, (k, v)) | Left((k, v)) | Right((k, v)) => writer.insert(k, v).unwrap(), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     writer.finish().unwrap(); | ||||
| } | ||||
| 
 | ||||
| impl<A: AsRef<[u8]>> Transform<A> { | ||||
| impl Transform<'_, '_> { | ||||
|     /// Extract the users ids, deduplicate and compute the new internal documents ids
 | ||||
|     /// and fields ids, writing all the documents under their internal ids into a final file.
 | ||||
|     ///
 | ||||
|     /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
 | ||||
|     /// the replaced documents ids, the number of documents in this update and the file
 | ||||
|     /// containing all those documents.
 | ||||
|     pub fn from_csv<R: Read>(mut self, reader: R) -> anyhow::Result<TransformOutput> { | ||||
|     pub fn from_csv<R: Read>(self, reader: R) -> anyhow::Result<TransformOutput> { | ||||
|         let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; | ||||
|         let documents_ids = self.index.documents_ids(self.rtxn)?; | ||||
|         let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); | ||||
|         let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); | ||||
| 
 | ||||
|         let mut csv = csv::Reader::from_reader(reader); | ||||
|         let headers = csv.headers()?.clone(); | ||||
|         let user_id_pos = headers.iter().position(|h| h == "id").context(r#"missing "id" header"#)?; | ||||
| @@ -63,7 +53,7 @@ impl<A: AsRef<[u8]>> Transform<A> { | ||||
|         // Generate the new fields ids based on the current fields ids and this CSV headers.
 | ||||
|         let mut fields_ids = Vec::new(); | ||||
|         for header in headers.iter() { | ||||
|             let id = self.fields_ids_map.insert(header) | ||||
|             let id = fields_ids_map.insert(header) | ||||
|                 .context("impossible to generate a field id (limit reached)")?; | ||||
|             fields_ids.push(id); | ||||
|         } | ||||
| @@ -120,7 +110,7 @@ impl<A: AsRef<[u8]>> Transform<A> { | ||||
|         let mut iter = sorter.into_iter()?; | ||||
|         while let Some((user_id, update_obkv)) = iter.next()? { | ||||
| 
 | ||||
|             let (docid, obkv) = match self.users_ids_documents_ids.get(user_id) { | ||||
|             let (docid, obkv) = match users_ids_documents_ids.get(user_id) { | ||||
|                 Some(docid) => { | ||||
|                     // If we find the user id in the current users ids documents ids map
 | ||||
|                     // we use it and insert it in the list of replaced documents.
 | ||||
| @@ -129,20 +119,22 @@ impl<A: AsRef<[u8]>> Transform<A> { | ||||
| 
 | ||||
|                     // Depending on the update indexing method we will merge
 | ||||
|                     // the document update with the current document or not.
 | ||||
|                     let must_merge_documents = false; | ||||
|                     if must_merge_documents { | ||||
|                         let base_obkv = todo!(); | ||||
|                         let update_obkv = obkv::KvReader::new(update_obkv); | ||||
|                         merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); | ||||
|                         (docid, obkv_buffer.as_slice()) | ||||
|                     } else { | ||||
|                         (docid, update_obkv) | ||||
|                     match self.index_documents_method { | ||||
|                         IndexDocumentsMethod::ReplaceDocuments => (docid, update_obkv), | ||||
|                         IndexDocumentsMethod::UpdateDocuments => { | ||||
|                             let key = BEU32::new(docid); | ||||
|                             let base_obkv = self.index.documents.get(&self.rtxn, &key)? | ||||
|                                 .context("document not found")?; | ||||
|                             let update_obkv = obkv::KvReader::new(update_obkv); | ||||
|                             merge_two_obkv(base_obkv, update_obkv, &mut obkv_buffer); | ||||
|                             (docid, obkv_buffer.as_slice()) | ||||
|                         } | ||||
|                     } | ||||
|                 }, | ||||
|                 None => { | ||||
|                     // If this user id is new we add it to the users ids documents ids map
 | ||||
|                     // for new ids and into the list of new documents.
 | ||||
|                     let new_docid = self.available_documents_ids.next() | ||||
|                     let new_docid = available_documents_ids.next() | ||||
|                         .context("no more available documents ids")?; | ||||
|                     new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?; | ||||
|                     new_documents_ids.insert(new_docid); | ||||
| @@ -163,7 +155,7 @@ impl<A: AsRef<[u8]>> Transform<A> { | ||||
|         // We create the union between the existing users ids documents ids with the new ones.
 | ||||
|         let new_users_ids_documents_ids = new_users_ids_documents_ids_builder.into_map(); | ||||
|         let union_ = fst::map::OpBuilder::new() | ||||
|             .add(&self.users_ids_documents_ids) | ||||
|             .add(&users_ids_documents_ids) | ||||
|             .add(&new_users_ids_documents_ids) | ||||
|             .r#union(); | ||||
| 
 | ||||
| @@ -176,7 +168,7 @@ impl<A: AsRef<[u8]>> Transform<A> { | ||||
|         } | ||||
| 
 | ||||
|         Ok(TransformOutput { | ||||
|             fields_ids_map: self.fields_ids_map, | ||||
|             fields_ids_map, | ||||
|             users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), | ||||
|             new_documents_ids, | ||||
|             replaced_documents_ids, | ||||
| @@ -1,35 +1,37 @@ | ||||
| use std::borrow::Cow; | ||||
| use std::convert::TryFrom; | ||||
|  | ||||
| use fst::{IntoStreamer, Streamer}; | ||||
| use grenad::CompressionType; | ||||
| use itertools::Itertools; | ||||
| use roaring::RoaringBitmap; | ||||
|  | ||||
| use crate::{Index, BEU32}; | ||||
| use crate::Index; | ||||
| use super::clear_documents::ClearDocuments; | ||||
| use super::delete_documents::DeleteDocuments; | ||||
| use super::index_documents::IndexDocuments; | ||||
|  | ||||
| pub struct UpdateBuilder { | ||||
|     log_every_n: usize, | ||||
|     max_nb_chunks: Option<usize>, | ||||
|     max_memory: usize, | ||||
|     linked_hash_map_size: usize, | ||||
|     chunk_compression_type: CompressionType, | ||||
|     chunk_compression_level: Option<u32>, | ||||
|     chunk_fusing_shrink_size: u64, | ||||
|     enable_chunk_fusing: bool, | ||||
|     indexing_jobs: Option<usize>, | ||||
|     pub(crate) log_every_n: Option<usize>, | ||||
|     pub(crate) max_nb_chunks: Option<usize>, | ||||
|     pub(crate) max_memory: Option<usize>, | ||||
|     pub(crate) linked_hash_map_size: Option<usize>, | ||||
|     pub(crate) chunk_compression_type: CompressionType, | ||||
|     pub(crate) chunk_compression_level: Option<u32>, | ||||
|     pub(crate) chunk_fusing_shrink_size: Option<u64>, | ||||
|     pub(crate) indexing_jobs: Option<usize>, | ||||
| } | ||||
|  | ||||
| impl UpdateBuilder { | ||||
|     pub fn new() -> UpdateBuilder { | ||||
|         todo!() | ||||
|         UpdateBuilder { | ||||
|             log_every_n: None, | ||||
|             max_nb_chunks: None, | ||||
|             max_memory: None, | ||||
|             linked_hash_map_size: None, | ||||
|             chunk_compression_type: CompressionType::None, | ||||
|             chunk_compression_level: None, | ||||
|             chunk_fusing_shrink_size: None, | ||||
|             indexing_jobs: None, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn log_every_n(&mut self, log_every_n: usize) -> &mut Self { | ||||
|         self.log_every_n = log_every_n; | ||||
|         self.log_every_n = Some(log_every_n); | ||||
|         self | ||||
|     } | ||||
|  | ||||
| @@ -39,12 +41,12 @@ impl UpdateBuilder { | ||||
|     } | ||||
|  | ||||
|     pub fn max_memory(&mut self, max_memory: usize) -> &mut Self { | ||||
|         self.max_memory = max_memory; | ||||
|         self.max_memory = Some(max_memory); | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn linked_hash_map_size(&mut self, linked_hash_map_size: usize) -> &mut Self { | ||||
|         self.linked_hash_map_size = linked_hash_map_size; | ||||
|         self.linked_hash_map_size = Some(linked_hash_map_size); | ||||
|         self | ||||
|     } | ||||
|  | ||||
| @@ -59,12 +61,7 @@ impl UpdateBuilder { | ||||
|     } | ||||
|  | ||||
|     pub fn chunk_fusing_shrink_size(&mut self, chunk_fusing_shrink_size: u64) -> &mut Self { | ||||
|         self.chunk_fusing_shrink_size = chunk_fusing_shrink_size; | ||||
|         self | ||||
|     } | ||||
|  | ||||
|     pub fn enable_chunk_fusing(&mut self, enable_chunk_fusing: bool) -> &mut Self { | ||||
|         self.enable_chunk_fusing = enable_chunk_fusing; | ||||
|         self.chunk_fusing_shrink_size = Some(chunk_fusing_shrink_size); | ||||
|         self | ||||
|     } | ||||
|  | ||||
| @@ -97,6 +94,33 @@ impl UpdateBuilder { | ||||
|         index: &'i Index, | ||||
|     ) -> IndexDocuments<'t, 'u, 'i> | ||||
|     { | ||||
|         IndexDocuments::new(wtxn, index) | ||||
|         let mut builder = IndexDocuments::new(wtxn, index); | ||||
|  | ||||
|         if let Some(log_every_n) = self.log_every_n { | ||||
|             builder.log_every_n(log_every_n); | ||||
|         } | ||||
|         if let Some(max_nb_chunks) = self.max_nb_chunks { | ||||
|             builder.max_nb_chunks(max_nb_chunks); | ||||
|         } | ||||
|         if let Some(max_memory) = self.max_memory { | ||||
|             builder.max_memory(max_memory); | ||||
|         } | ||||
|         if let Some(linked_hash_map_size) = self.linked_hash_map_size { | ||||
|             builder.linked_hash_map_size(linked_hash_map_size); | ||||
|         } | ||||
|  | ||||
|         builder.chunk_compression_type(self.chunk_compression_type); | ||||
|  | ||||
|         if let Some(chunk_compression_level) = self.chunk_compression_level { | ||||
|             builder.chunk_compression_level(chunk_compression_level); | ||||
|         } | ||||
|         if let Some(chunk_fusing_shrink_size) = self.chunk_fusing_shrink_size { | ||||
|             builder.chunk_fusing_shrink_size(chunk_fusing_shrink_size); | ||||
|         } | ||||
|         if let Some(indexing_jobs) = self.indexing_jobs { | ||||
|             builder.indexing_jobs(indexing_jobs); | ||||
|         } | ||||
|  | ||||
|         builder | ||||
|     } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user