mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-24 20:46:27 +00:00 
			
		
		
		
	Introduce prefix postings ids for better perfs
This commit is contained in:
		| @@ -7,15 +7,15 @@ use std::sync::atomic::{AtomicUsize, Ordering}; | ||||
| use anyhow::Context; | ||||
| use cow_utils::CowUtils; | ||||
| use fst::{Streamer, IntoStreamer}; | ||||
| use heed::EnvOpenOptions; | ||||
| use heed::types::*; | ||||
| use heed::{EnvOpenOptions, PolyDatabase, Database}; | ||||
| use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; | ||||
| use rayon::prelude::*; | ||||
| use roaring::RoaringBitmap; | ||||
| use structopt::StructOpt; | ||||
|  | ||||
| use mega_mini_indexer::alphanumeric_tokens; | ||||
| use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, DocumentId}; | ||||
| use mega_mini_indexer::{FastMap4, SmallVec32, BEU32, Index, DocumentId}; | ||||
|  | ||||
| #[cfg(target_os = "linux")] | ||||
| #[global_allocator] | ||||
| @@ -38,6 +38,7 @@ struct Opt { | ||||
| struct Indexed { | ||||
|     fst: fst::Set<Vec<u8>>, | ||||
|     postings_ids: FastMap4<SmallVec32, RoaringBitmap>, | ||||
|     prefix_postings_ids: FastMap4<SmallVec32, RoaringBitmap>, | ||||
|     headers: Vec<u8>, | ||||
|     documents: Vec<(DocumentId, Vec<u8>)>, | ||||
| } | ||||
| @@ -69,8 +70,21 @@ impl MtblKvStore { | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // postings ids keys are all prefixed by a '2' | ||||
|         // We must write the prefix postings ids | ||||
|         key[0] = 2; | ||||
|         let mut stream = indexed.fst.stream(); | ||||
|         while let Some(word) = stream.next() { | ||||
|             key.truncate(1); | ||||
|             key.extend_from_slice(word); | ||||
|             if let Some(ids) = indexed.prefix_postings_ids.remove(word) { | ||||
|                 buffer.clear(); | ||||
|                 ids.serialize_into(&mut buffer)?; | ||||
|                 out.add(&key, &buffer).unwrap(); | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // postings ids keys are all prefixed by a '2' | ||||
|         key[0] = 3; | ||||
|         indexed.documents.sort_unstable(); | ||||
|         for (id, content) in indexed.documents { | ||||
|             key.truncate(1); | ||||
| @@ -115,7 +129,7 @@ impl MtblKvStore { | ||||
|                 assert_eq!(left, right); | ||||
|                 Some(left.to_vec()) | ||||
|             } | ||||
|             else if key.starts_with(&[1]) { | ||||
|             else if key.starts_with(&[1]) || key.starts_with(&[2]) { | ||||
|                 let mut left = RoaringBitmap::deserialize_from(left).unwrap(); | ||||
|                 let right = RoaringBitmap::deserialize_from(right).unwrap(); | ||||
|                 left.union_with(&right); | ||||
| @@ -123,7 +137,7 @@ impl MtblKvStore { | ||||
|                 left.serialize_into(&mut vec).unwrap(); | ||||
|                 Some(vec) | ||||
|             } | ||||
|             else if key.starts_with(&[2]) { | ||||
|             else if key.starts_with(&[3]) { | ||||
|                 assert_eq!(left, right); | ||||
|                 Some(left.to_vec()) | ||||
|             } | ||||
| @@ -155,6 +169,7 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> { | ||||
|  | ||||
|     let mut document = csv::StringRecord::new(); | ||||
|     let mut postings_ids = FastMap4::default(); | ||||
|     let mut prefix_postings_ids = FastMap4::default(); | ||||
|     let mut documents = Vec::new(); | ||||
|  | ||||
|     // Write the headers into a Vec of bytes. | ||||
| @@ -174,6 +189,11 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> { | ||||
|                     postings_ids.entry(SmallVec32::from(word.as_bytes())) | ||||
|                         .or_insert_with(RoaringBitmap::new) | ||||
|                         .insert(document_id); | ||||
|                     if let Some(prefix) = word.as_bytes().get(0..word.len().min(4)) { | ||||
|                         prefix_postings_ids.entry(SmallVec32::from(prefix)) | ||||
|                             .or_insert_with(RoaringBitmap::new) | ||||
|                             .insert(document_id); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
| @@ -185,7 +205,7 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> { | ||||
|         documents.push((document_id, document)); | ||||
|     } | ||||
|  | ||||
|     // We compute and store the postings list into the DB. | ||||
|     // We store the words from the postings. | ||||
|     let mut new_words = BTreeSet::default(); | ||||
|     for (word, _new_ids) in &postings_ids { | ||||
|         new_words.insert(word.clone()); | ||||
| @@ -193,20 +213,13 @@ fn index_csv(mut rdr: csv::Reader<File>) -> anyhow::Result<MtblKvStore> { | ||||
|  | ||||
|     let new_words_fst = fst::Set::from_iter(new_words.iter().map(SmallVec32::as_ref))?; | ||||
|  | ||||
|     let indexed = Indexed { fst: new_words_fst, headers, postings_ids, documents }; | ||||
|     let indexed = Indexed { fst: new_words_fst, headers, postings_ids, prefix_postings_ids, documents }; | ||||
|  | ||||
|     MtblKvStore::from_indexed(indexed) | ||||
| } | ||||
|  | ||||
| // TODO merge with the previous values | ||||
| fn writer( | ||||
|     wtxn: &mut heed::RwTxn, | ||||
|     main: PolyDatabase, | ||||
|     postings_ids: Database<Str, ByteSlice>, | ||||
|     documents: Database<OwnedType<BEU32>, ByteSlice>, | ||||
|     mtbl_store: MtblKvStore, | ||||
| ) -> anyhow::Result<usize> | ||||
| { | ||||
| fn writer(wtxn: &mut heed::RwTxn, index: Index, mtbl_store: MtblKvStore) -> anyhow::Result<usize> { | ||||
|     let mtbl_store = match mtbl_store.0 { | ||||
|         Some(store) => unsafe { memmap::Mmap::map(&store)? }, | ||||
|         None => return Ok(0), | ||||
| @@ -216,25 +229,32 @@ fn writer( | ||||
|     // Write the words fst | ||||
|     let fst = mtbl_store.get(b"\0words-fst").unwrap(); | ||||
|     let fst = fst::Set::new(fst)?; | ||||
|     main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; | ||||
|     index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", &fst.as_fst().as_bytes())?; | ||||
|  | ||||
|     // Write and merge the headers | ||||
|     let headers = mtbl_store.get(b"\0headers").unwrap(); | ||||
|     main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; | ||||
|     index.main.put::<_, Str, ByteSlice>(wtxn, "headers", headers.as_ref())?; | ||||
|  | ||||
|     // Write and merge the postings lists | ||||
|     let mut iter = mtbl_store.iter_prefix(&[1]).unwrap(); | ||||
|     while let Some((word, postings)) = iter.next() { | ||||
|         let word = std::str::from_utf8(&word[1..]).unwrap(); | ||||
|         postings_ids.put(wtxn, &word, &postings)?; | ||||
|         index.postings_ids.put(wtxn, &word, &postings)?; | ||||
|     } | ||||
|  | ||||
|     // Write and merge the prefix postings lists | ||||
|     let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); | ||||
|     while let Some((word, postings)) = iter.next() { | ||||
|         let word = std::str::from_utf8(&word[1..]).unwrap(); | ||||
|         index.prefix_postings_ids.put(wtxn, &word, &postings)?; | ||||
|     } | ||||
|  | ||||
|     // Write the documents | ||||
|     let mut count = 0; | ||||
|     let mut iter = mtbl_store.iter_prefix(&[2]).unwrap(); | ||||
|     let mut iter = mtbl_store.iter_prefix(&[3]).unwrap(); | ||||
|     while let Some((id_bytes, content)) = iter.next() { | ||||
|         let id = id_bytes[1..].try_into().map(u32::from_be_bytes).unwrap(); | ||||
|         documents.put(wtxn, &BEU32::new(id), &content)?; | ||||
|         index.documents.put(wtxn, &BEU32::new(id), &content)?; | ||||
|         count += 1; | ||||
|     } | ||||
|  | ||||
| @@ -251,10 +271,7 @@ fn main() -> anyhow::Result<()> { | ||||
|         .max_dbs(5) | ||||
|         .open(opt.database)?; | ||||
|  | ||||
|     let main = env.create_poly_database(None)?; | ||||
|     let postings_ids: Database<Str, ByteSlice> = env.create_database(Some("postings-ids"))?; | ||||
|     let documents: Database<OwnedType<BEU32>, ByteSlice> = env.create_database(Some("documents"))?; | ||||
|  | ||||
|     let index = Index::new(&env)?; | ||||
|     let res = opt.files_to_index | ||||
|         .into_par_iter() | ||||
|         .try_fold(MtblKvStore::default, |acc, path| { | ||||
| @@ -271,7 +288,7 @@ fn main() -> anyhow::Result<()> { | ||||
|  | ||||
|     eprintln!("We are writing into LMDB..."); | ||||
|     let mut wtxn = env.write_txn()?; | ||||
|     let count = writer(&mut wtxn, main, postings_ids, documents, mtbl_store)?; | ||||
|     let count = writer(&mut wtxn, index, mtbl_store)?; | ||||
|     wtxn.commit()?; | ||||
|     eprintln!("Wrote {} documents into LMDB", count); | ||||
|  | ||||
|   | ||||
							
								
								
									
										24
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								src/lib.rs
									
									
									
									
									
								
							| @@ -26,6 +26,7 @@ pub fn alphanumeric_tokens(string: &str) -> impl Iterator<Item = &str> { | ||||
| pub struct Index { | ||||
|     pub main: PolyDatabase, | ||||
|     pub postings_ids: Database<Str, ByteSlice>, | ||||
|     pub prefix_postings_ids: Database<Str, ByteSlice>, | ||||
|     pub documents: Database<OwnedType<BEU32>, ByteSlice>, | ||||
| } | ||||
|  | ||||
| @@ -33,11 +34,13 @@ impl Index { | ||||
|     pub fn new(env: &heed::Env) -> heed::Result<Index> { | ||||
|         let main = env.create_poly_database(None)?; | ||||
|         let postings_ids = env.create_database(Some("postings-ids"))?; | ||||
|         let prefix_postings_ids = env.create_database(Some("prefix-postings-ids"))?; | ||||
|         let documents = env.create_database(Some("documents"))?; | ||||
|  | ||||
|         Ok(Index { | ||||
|             main, | ||||
|             postings_ids, | ||||
|             prefix_postings_ids, | ||||
|             documents, | ||||
|         }) | ||||
|     } | ||||
| @@ -73,16 +76,23 @@ impl Index { | ||||
|         let mut intersect_result: Option<RoaringBitmap> = None; | ||||
|         for (word, dfa) in dfas { | ||||
|             let before = Instant::now(); | ||||
|  | ||||
|             let mut union_result = RoaringBitmap::default(); | ||||
|             let mut stream = fst.search(dfa).into_stream(); | ||||
|             while let Some(word) = stream.next() { | ||||
|                 let word = std::str::from_utf8(word)?; | ||||
|                 if let Some(ids) = self.postings_ids.get(rtxn, word)? { | ||||
|                     let right = RoaringBitmap::deserialize_from(ids)?; | ||||
|                     union_result.union_with(&right); | ||||
|             if word.len() <= 4 { | ||||
|                 if let Some(ids) = self.prefix_postings_ids.get(rtxn, &word[..word.len().min(4)])? { | ||||
|                     union_result = RoaringBitmap::deserialize_from(ids)?; | ||||
|                 } | ||||
|             } else { | ||||
|                 let mut stream = fst.search(dfa).into_stream(); | ||||
|                 while let Some(word) = stream.next() { | ||||
|                     let word = std::str::from_utf8(word)?; | ||||
|                     if let Some(ids) = self.postings_ids.get(rtxn, word)? { | ||||
|                         let right = RoaringBitmap::deserialize_from(ids)?; | ||||
|                         union_result.union_with(&right); | ||||
|                     } | ||||
|                 } | ||||
|                 eprintln!("union for {:?} took {:.02?}", word, before.elapsed()); | ||||
|             } | ||||
|             eprintln!("union for {:?} took {:.02?}", word, before.elapsed()); | ||||
|  | ||||
|             intersect_result = match intersect_result.take() { | ||||
|                 Some(mut left) => { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user