mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 05:26:27 +00:00 
			
		
		
		
	Introduce a new indexer which uses an MTBL sorter
This commit is contained in:
		
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -1 +1,3 @@ | ||||
| /target | ||||
| *.csv | ||||
| *.mmdb | ||||
|   | ||||
							
								
								
									
										27
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										27
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -21,6 +21,14 @@ version = "1.0.31" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f" | ||||
|  | ||||
| [[package]] | ||||
| name = "arc-cache" | ||||
| version = "0.2.4" | ||||
| source = "git+https://github.com/Kerollmops/rust-arc-cache.git?rev=56530f2#56530f2d219823f8f88dc03851f8fe057bd72564" | ||||
| dependencies = [ | ||||
|  "xlru-cache", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "arc-swap" | ||||
| version = "0.4.6" | ||||
| @@ -892,6 +900,12 @@ version = "0.2.70" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "3baa92041a6fec78c687fa0cc2b3fae8884f743d672cf551bed1d6dac6988d0f" | ||||
|  | ||||
| [[package]] | ||||
| name = "linked-hash-map" | ||||
| version = "0.5.3" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" | ||||
|  | ||||
| [[package]] | ||||
| name = "lmdb-rkv-sys" | ||||
| version = "0.11.0" | ||||
| @@ -963,6 +977,7 @@ name = "milli" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "anyhow", | ||||
|  "arc-cache", | ||||
|  "askama", | ||||
|  "askama_warp", | ||||
|  "bitpacking", | ||||
| @@ -1200,12 +1215,14 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" | ||||
| [[package]] | ||||
| name = "oxidized-mtbl" | ||||
| version = "0.1.0" | ||||
| source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=9451be8#9451be8829562f7d1f8d34aa3ecb81c5106a0623" | ||||
| source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=6b8a3a8#6b8a3a83a8b83bfdba38f7ea67bfa5868e668741" | ||||
| dependencies = [ | ||||
|  "byteorder", | ||||
|  "crc32c", | ||||
|  "flate2", | ||||
|  "memmap", | ||||
|  "snap", | ||||
|  "tempfile", | ||||
|  "zstd", | ||||
| ] | ||||
|  | ||||
| @@ -2340,6 +2357,14 @@ dependencies = [ | ||||
|  "winapi-build", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "xlru-cache" | ||||
| version = "0.1.2" | ||||
| source = "git+https://github.com/Kerollmops/rust-xlru-cache.git?rev=3c90f49#3c90f49e11758ee0cc4ff145b2606ba143188b77" | ||||
| dependencies = [ | ||||
|  "linked-hash-map", | ||||
| ] | ||||
|  | ||||
| [[package]] | ||||
| name = "zerocopy" | ||||
| version = "0.3.0" | ||||
|   | ||||
| @@ -18,7 +18,8 @@ jemallocator = "0.3.2" | ||||
| levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] } | ||||
| memmap = "0.7.0" | ||||
| once_cell = "1.4.0" | ||||
| oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "9451be8" } | ||||
| oxidized-mtbl = { git = "https://github.com/Kerollmops/oxidized-mtbl.git", rev = "6b8a3a8" } | ||||
| arc-cache = { git = "https://github.com/Kerollmops/rust-arc-cache.git", rev = "56530f2" } | ||||
| rayon = "1.3.1" | ||||
| roaring = { git = "https://github.com/Kerollmops/roaring-rs.git", branch = "mem-usage" } | ||||
| slice-group-by = "0.2.6" | ||||
|   | ||||
| @@ -1,24 +1,26 @@ | ||||
| use std::collections::hash_map::Entry; | ||||
| use std::collections::{HashMap, BTreeSet}; | ||||
| use std::collections::HashMap; | ||||
| use std::convert::{TryFrom, TryInto}; | ||||
| use std::fs::File; | ||||
| use std::mem; | ||||
| use std::iter::FromIterator; | ||||
| use std::path::PathBuf; | ||||
| use std::time::Instant; | ||||
|  | ||||
| use anyhow::Context; | ||||
| use arc_cache::ArcCache; | ||||
| use cow_utils::CowUtils; | ||||
| use fst::{Streamer, IntoStreamer}; | ||||
| use heed::EnvOpenOptions; | ||||
| use heed::types::*; | ||||
| use log::debug; | ||||
| use oxidized_mtbl::{Reader, ReaderOptions, Writer, Merger, MergerOptions}; | ||||
| use memmap::Mmap; | ||||
| use oxidized_mtbl::{Reader, Writer, Merger, Sorter, CompressionType}; | ||||
| use rayon::prelude::*; | ||||
| use roaring::RoaringBitmap; | ||||
| use slice_group_by::StrGroupBy; | ||||
| use structopt::StructOpt; | ||||
|  | ||||
| use milli::{FastMap4, SmallVec32, Index, DocumentId, Position}; | ||||
| use milli::{SmallVec32, Index, DocumentId, Position}; | ||||
|  | ||||
| const LMDB_MAX_KEY_LENGTH: usize = 512; | ||||
| const ONE_MILLION: usize = 1_000_000; | ||||
| @@ -26,6 +28,9 @@ const ONE_MILLION: usize = 1_000_000; | ||||
| const MAX_POSITION: usize = 1000; | ||||
| const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; | ||||
|  | ||||
| const HEADERS_KEY: &[u8] = b"\0headers"; | ||||
| const WORDS_FST_KEY: &[u8] = b"\x06words-fst"; | ||||
|  | ||||
| #[cfg(target_os = "linux")] | ||||
| #[global_allocator] | ||||
| static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; | ||||
| @@ -47,14 +52,6 @@ struct Opt { | ||||
|     #[structopt(short, long)] | ||||
|     jobs: Option<usize>, | ||||
|  | ||||
|     /// Maximum number of bytes to allocate, will be divided by the number of | ||||
|     /// cores used. It is recommended to set a maximum of half of the available memory | ||||
|     /// as the current measurement method is really bad. | ||||
|     /// | ||||
|     /// The minumum amount of memory used will be 50MB anyway. | ||||
|     #[structopt(long, default_value = "4294967296")] | ||||
|     max_memory_usage: usize, | ||||
|  | ||||
|     /// Verbose mode (-v, -vv, -vvv, etc.) | ||||
|     #[structopt(short, long, parse(from_occurrences))] | ||||
|     verbose: usize, | ||||
| @@ -63,75 +60,88 @@ struct Opt { | ||||
|     csv_file: Option<PathBuf>, | ||||
| } | ||||
|  | ||||
| struct Indexed { | ||||
|     fst: fst::Set<Vec<u8>>, | ||||
|     word_positions: FastMap4<SmallVec32<u8>, RoaringBitmap>, | ||||
|     word_position_docids: FastMap4<(SmallVec32<u8>, Position), RoaringBitmap>, | ||||
|     headers: Vec<u8>, | ||||
|     documents: Vec<(DocumentId, Vec<u8>)>, | ||||
| type MergeFn = fn(&[u8], &[Vec<u8>]) -> Result<Vec<u8>, ()>; | ||||
|  | ||||
| struct Store { | ||||
|     word_positions: ArcCache<SmallVec32<u8>, RoaringBitmap>, | ||||
|     word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>, | ||||
|     sorter: Sorter<MergeFn>, | ||||
| } | ||||
|  | ||||
| impl Indexed { | ||||
|     fn new( | ||||
|         word_positions: FastMap4<SmallVec32<u8>, RoaringBitmap>, | ||||
|         word_position_docids: FastMap4<(SmallVec32<u8>, Position), RoaringBitmap>, | ||||
|         headers: Vec<u8>, | ||||
|         documents: Vec<(DocumentId, Vec<u8>)>, | ||||
|     ) -> anyhow::Result<Indexed> | ||||
| impl Store { | ||||
|     fn new() -> Store { | ||||
|         let sorter = Sorter::builder(merge as MergeFn) | ||||
|             .chunk_compression_type(CompressionType::Snappy) | ||||
|             .build(); | ||||
|  | ||||
|         Store { | ||||
|             word_positions: ArcCache::new(65_535), | ||||
|             word_position_docids: ArcCache::new(65_535), | ||||
|             sorter, | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     // Save the positions where this word has been seen. | ||||
|     pub fn insert_word_position(&mut self, word: &str, position: Position) -> anyhow::Result<()> { | ||||
|         let word = SmallVec32::from(word.as_bytes()); | ||||
|         let position = RoaringBitmap::from_iter(Some(position)); | ||||
|         let (_, lrus) = self.word_positions.insert(word, position, |old, new| old.union_with(&new)); | ||||
|         Self::write_word_positions(&mut self.sorter, lrus) | ||||
|     } | ||||
|  | ||||
|     // Save the documents ids under the position and word we have seen it. | ||||
|     pub fn insert_word_position_docid(&mut self, word: &str, position: Position, id: DocumentId) -> anyhow::Result<()> { | ||||
|         let word = SmallVec32::from(word.as_bytes()); | ||||
|         let ids = RoaringBitmap::from_iter(Some(id)); | ||||
|         let (_, lrus) = self.word_position_docids.insert((word, position), ids, |old, new| old.union_with(&new)); | ||||
|         Self::write_word_position_docids(&mut self.sorter, lrus) | ||||
|     } | ||||
|  | ||||
|     pub fn write_headers(&mut self, headers: &[u8]) -> anyhow::Result<()> { | ||||
|         Ok(self.sorter.insert(HEADERS_KEY, headers)?) | ||||
|     } | ||||
|  | ||||
|     pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { | ||||
|         let id =  id.to_be_bytes(); | ||||
|         let mut key = Vec::with_capacity(1 + id.len()); | ||||
|  | ||||
|         // postings ids keys are all prefixed by a '5' | ||||
|         key.push(5); | ||||
|         key.extend_from_slice(&id); | ||||
|  | ||||
|         Ok(self.sorter.insert(&key, content)?) | ||||
|     } | ||||
|  | ||||
|     fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()> | ||||
|     where I: IntoIterator<Item=(SmallVec32<u8>, RoaringBitmap)> | ||||
|     { | ||||
|         // We store the words from the postings. | ||||
|         let new_words: BTreeSet<_> = word_position_docids.iter().map(|((w, _), _)| w).collect(); | ||||
|         let fst = fst::Set::from_iter(new_words)?; | ||||
|         Ok(Indexed { fst, headers, word_positions, word_position_docids, documents }) | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[derive(Default)] | ||||
| struct MtblKvStore(Option<File>); | ||||
|  | ||||
| impl MtblKvStore { | ||||
|     fn from_indexed(mut indexed: Indexed) -> anyhow::Result<MtblKvStore> { | ||||
|         debug!("Creating an MTBL store from an Indexed..."); | ||||
|  | ||||
|         let outfile = tempfile::tempfile()?; | ||||
|         let mut out = Writer::new(outfile, None)?; | ||||
|  | ||||
|         out.add(b"\0headers", indexed.headers)?; | ||||
|         out.add(b"\0words-fst", indexed.fst.as_fst().as_bytes())?; | ||||
|  | ||||
|         // postings ids keys are all prefixed by a '1' | ||||
|         let mut key = vec![0]; | ||||
|         let mut key = vec![1]; | ||||
|         let mut buffer = Vec::new(); | ||||
|  | ||||
|         // We must write the postings attrs | ||||
|         key[0] = 1; | ||||
|         // We must write the postings ids in order for mtbl therefore | ||||
|         // we iterate over the fst to read the words in order | ||||
|         let mut stream = indexed.fst.stream(); | ||||
|         while let Some(word) = stream.next() { | ||||
|             if let Some(positions) = indexed.word_positions.get(word) { | ||||
|         for (word, positions) in iter { | ||||
|             key.truncate(1); | ||||
|                 key.extend_from_slice(word); | ||||
|             key.extend_from_slice(&word); | ||||
|             // We serialize the positions into a buffer | ||||
|             buffer.clear(); | ||||
|             positions.serialize_into(&mut buffer)?; | ||||
|             // that we write under the generated key into MTBL | ||||
|                 out.add(&key, &buffer).unwrap(); | ||||
|             } | ||||
|             sorter.insert(&key, &buffer)?; | ||||
|         } | ||||
|  | ||||
|         // We must write the postings ids | ||||
|         key[0] = 3; | ||||
|         // We must write the postings ids in order for mtbl therefore | ||||
|         // we iterate over the fst to read the words in order | ||||
|         let mut stream = indexed.fst.stream(); | ||||
|         while let Some(word) = stream.next() { | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     fn write_word_position_docids<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()> | ||||
|     where I: IntoIterator<Item=((SmallVec32<u8>, Position), RoaringBitmap)> | ||||
|     { | ||||
|         // postings positions ids keys are all prefixed by a '3' | ||||
|         let mut key = vec![3]; | ||||
|         let mut buffer = Vec::new(); | ||||
|  | ||||
|         for ((word, pos), ids) in iter { | ||||
|             key.truncate(1); | ||||
|             key.extend_from_slice(word); | ||||
|             if let Some(positions) = indexed.word_positions.remove(word) { | ||||
|                 // We iterate over all the attributes containing the documents ids | ||||
|                 for pos in positions { | ||||
|                     let ids = indexed.word_position_docids.remove(&(SmallVec32::from(word), pos)).unwrap(); | ||||
|             key.extend_from_slice(&word); | ||||
|             // we postfix the word by the positions it appears in | ||||
|             let position_bytes = pos.to_be_bytes(); | ||||
|             key.extend_from_slice(&position_bytes); | ||||
| @@ -139,30 +149,45 @@ impl MtblKvStore { | ||||
|             buffer.clear(); | ||||
|             ids.serialize_into(&mut buffer)?; | ||||
|             // that we write under the generated key into MTBL | ||||
|                     out.add(&key, &buffer).unwrap(); | ||||
|             sorter.insert(&key, &buffer)?; | ||||
|             // And cleanup the position afterward | ||||
|             key.truncate(key.len() - position_bytes.len()); | ||||
|         } | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     pub fn finish(mut self) -> anyhow::Result<Reader<Mmap>> { | ||||
|         Self::write_word_positions(&mut self.sorter, self.word_positions)?; | ||||
|         Self::write_word_position_docids(&mut self.sorter, self.word_position_docids)?; | ||||
|  | ||||
|         let mut wtr = tempfile::tempfile().map(Writer::new)?; | ||||
|         let mut builder = fst::SetBuilder::memory(); | ||||
|  | ||||
|         let mut iter = self.sorter.into_iter()?; | ||||
|         while let Some(result) = iter.next() { | ||||
|             let (key, val) = result?; | ||||
|             if let Some((&1, word)) = key.split_first() { | ||||
|                 // This is a lexicographically ordered word position | ||||
|                 // we use the key to construct the words fst. | ||||
|                 builder.insert(word)?; | ||||
|             } | ||||
|             wtr.insert(key, val)?; | ||||
|         } | ||||
|  | ||||
|         let fst = builder.into_set(); | ||||
|         wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; | ||||
|  | ||||
|         let file = wtr.into_inner()?; | ||||
|         let mmap = unsafe { Mmap::map(&file)? }; | ||||
|         let reader = Reader::new(mmap)?; | ||||
|  | ||||
|         Ok(reader) | ||||
|     } | ||||
| } | ||||
|  | ||||
|         // postings ids keys are all prefixed | ||||
|         key[0] = 5; | ||||
|         indexed.documents.sort_unstable_by_key(|(id, _)| *id); | ||||
|         for (id, content) in indexed.documents { | ||||
|             key.truncate(1); | ||||
|             key.extend_from_slice(&id.to_be_bytes()); | ||||
|             out.add(&key, content).unwrap(); | ||||
|         } | ||||
|  | ||||
|         let out = out.into_inner()?; | ||||
|  | ||||
|         debug!("MTBL store created!"); | ||||
|         Ok(MtblKvStore(Some(out))) | ||||
|     } | ||||
|  | ||||
|     fn merge(key: &[u8], values: &[Vec<u8>]) -> Option<Vec<u8>> { | ||||
|         if key == b"\0words-fst" { | ||||
| fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | ||||
|     if key == WORDS_FST_KEY { | ||||
|         let fsts: Vec<_> = values.iter().map(|v| fst::Set::new(v).unwrap()).collect(); | ||||
|  | ||||
|         // Union of the two FSTs | ||||
| @@ -172,11 +197,11 @@ impl MtblKvStore { | ||||
|  | ||||
|         let mut build = fst::SetBuilder::memory(); | ||||
|         build.extend_stream(op.into_stream()).unwrap(); | ||||
|             Some(build.into_inner().unwrap()) | ||||
|         Ok(build.into_inner().unwrap()) | ||||
|     } | ||||
|         else if key == b"\0headers" { | ||||
|     else if key == HEADERS_KEY { | ||||
|         assert!(values.windows(2).all(|vs| vs[0] == vs[1])); | ||||
|             Some(values[0].to_vec()) | ||||
|         Ok(values[0].to_vec()) | ||||
|     } | ||||
|     // We either merge postings attrs, prefix postings or postings ids. | ||||
|     else if key[0] == 1 || key[0] == 2 || key[0] == 3 || key[0] == 4 { | ||||
| @@ -189,159 +214,24 @@ impl MtblKvStore { | ||||
|  | ||||
|         let mut vec = Vec::new(); | ||||
|         first.serialize_into(&mut vec).unwrap(); | ||||
|             Some(vec) | ||||
|         Ok(vec) | ||||
|     } | ||||
|     else if key[0] == 5 { | ||||
|         assert!(values.windows(2).all(|vs| vs[0] == vs[1])); | ||||
|             Some(values[0].to_vec()) | ||||
|         Ok(values[0].to_vec()) | ||||
|     } | ||||
|     else { | ||||
|         panic!("wut? {:?}", key) | ||||
|     } | ||||
| } | ||||
|  | ||||
|     fn from_many<F>(stores: Vec<MtblKvStore>, mut f: F) -> anyhow::Result<()> | ||||
|     where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> | ||||
|     { | ||||
|         debug!("Merging {} MTBL stores...", stores.len()); | ||||
|         let before = Instant::now(); | ||||
|  | ||||
|         let mmaps: Vec<_> = stores.iter().flat_map(|m| { | ||||
|             m.0.as_ref().map(|f| unsafe { memmap::Mmap::map(f).unwrap() }) | ||||
|         }).collect(); | ||||
|  | ||||
|         let sources = mmaps.iter().map(|mmap| { | ||||
|             Reader::new(&mmap, ReaderOptions::default()).unwrap() | ||||
|         }).collect(); | ||||
|  | ||||
|         let opt = MergerOptions { merge: MtblKvStore::merge }; | ||||
|         let mut merger = Merger::new(sources, opt); | ||||
|  | ||||
|         let mut iter = merger.iter(); | ||||
|         while let Some((k, v)) = iter.next() { | ||||
|             (f)(k, v)?; | ||||
|         } | ||||
|  | ||||
|         debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| fn mem_usage( | ||||
|     word_positions: &FastMap4<SmallVec32<u8>, RoaringBitmap>, | ||||
|     word_position_docids: &FastMap4<(SmallVec32<u8>, Position), RoaringBitmap>, | ||||
|     documents: &Vec<(u32, Vec<u8>)>, | ||||
| ) -> usize | ||||
| { | ||||
|     use std::mem::size_of; | ||||
|  | ||||
|     let documents = | ||||
|           documents.iter().map(|(_, d)| d.capacity()).sum::<usize>() | ||||
|         + documents.capacity() * size_of::<(Position, Vec<u8>)>(); | ||||
|  | ||||
|     let word_positions = | ||||
|           word_positions.iter().map(|(k, r)| { | ||||
|             (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() | ||||
|           }).sum::<usize>() | ||||
|         + word_positions.capacity() * size_of::<(SmallVec32<u8>, RoaringBitmap)>(); | ||||
|  | ||||
|     let word_position_docids = | ||||
|           word_position_docids.iter().map(|((k, _), r)| { | ||||
|             (if k.spilled() { k.capacity() } else { 0 }) + r.mem_usage() | ||||
|           }).sum::<usize>() | ||||
|         + word_position_docids.capacity() * size_of::<((SmallVec32<u8>, Position), RoaringBitmap)>(); | ||||
|  | ||||
|     documents + word_positions + word_position_docids | ||||
| } | ||||
|  | ||||
| fn index_csv( | ||||
|     mut rdr: csv::Reader<File>, | ||||
|     thread_index: usize, | ||||
|     num_threads: usize, | ||||
|     max_mem_usage: usize, | ||||
| ) -> anyhow::Result<Vec<MtblKvStore>> | ||||
| { | ||||
|     debug!("{:?}: Indexing into an Indexed...", thread_index); | ||||
|  | ||||
|     let mut stores = Vec::new(); | ||||
|  | ||||
|     let mut word_positions = FastMap4::default(); | ||||
|     let mut word_position_docids = FastMap4::default(); | ||||
|     let mut documents = Vec::new(); | ||||
|  | ||||
|     // Write the headers into a Vec of bytes. | ||||
|     let headers = rdr.headers()?; | ||||
|     let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); | ||||
|     writer.write_byte_record(headers.as_byte_record())?; | ||||
|     let headers = writer.into_inner()?; | ||||
|  | ||||
|     let mut document_id: usize = 0; | ||||
|     let mut document = csv::StringRecord::new(); | ||||
|     while rdr.read_record(&mut document)? { | ||||
|         document_id = document_id + 1; | ||||
|  | ||||
|         // We skip documents that must not be indexed by this thread | ||||
|         if document_id % num_threads != thread_index { continue } | ||||
|  | ||||
|         let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; | ||||
|  | ||||
|         if document_id % (ONE_MILLION as u32) == 0 { | ||||
|             debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); | ||||
|         } | ||||
|  | ||||
|         for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { | ||||
|             for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { | ||||
|                 if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { | ||||
|                     let word = word.cow_to_lowercase(); | ||||
|                     let position = (attr * MAX_POSITION + pos) as u32; | ||||
|  | ||||
|                     // We save the positions where this word has been seen. | ||||
|                     word_positions.entry(SmallVec32::from(word.as_bytes())) | ||||
|                         .or_insert_with(RoaringBitmap::new).insert(position); | ||||
|  | ||||
|                     // We save the documents ids under the position and word we have seen it. | ||||
|                     word_position_docids.entry((SmallVec32::from(word.as_bytes()), position)) // word + position | ||||
|                         .or_insert_with(RoaringBitmap::new).insert(document_id); // document ids | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // We write the document in the database. | ||||
|         let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); | ||||
|         writer.write_byte_record(document.as_byte_record())?; | ||||
|         let document = writer.into_inner()?; | ||||
|         documents.push((document_id, document)); | ||||
|  | ||||
|         if documents.len() % 100_000 == 0 { | ||||
|             let usage = mem_usage(&word_positions, &word_position_docids, &documents); | ||||
|             if usage > max_mem_usage { | ||||
|                 debug!("Whoops too much memory used ({}B).", usage); | ||||
|  | ||||
|                 let word_positions = mem::take(&mut word_positions); | ||||
|                 let word_position_docids = mem::take(&mut word_position_docids); | ||||
|                 let documents = mem::take(&mut documents); | ||||
|  | ||||
|                 let indexed = Indexed::new(word_positions, word_position_docids, headers.clone(), documents)?; | ||||
|                 debug!("{:?}: Indexed created!", thread_index); | ||||
|                 stores.push(MtblKvStore::from_indexed(indexed)?); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     let indexed = Indexed::new(word_positions, word_position_docids, headers, documents)?; | ||||
|     debug!("{:?}: Indexed created!", thread_index); | ||||
|     stores.push(MtblKvStore::from_indexed(indexed)?); | ||||
|  | ||||
|     Ok(stores) | ||||
| } | ||||
|  | ||||
| // TODO merge with the previous values | ||||
| fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { | ||||
|     if key == b"\0words-fst" { | ||||
| fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyhow::Result<()> { | ||||
|     if key == WORDS_FST_KEY { | ||||
|         // Write the words fst | ||||
|         index.main.put::<_, Str, ByteSlice>(wtxn, "words-fst", val)?; | ||||
|     } | ||||
|     else if key == b"\0headers" { | ||||
|     else if key == HEADERS_KEY { | ||||
|         // Write the headers | ||||
|         index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; | ||||
|     } | ||||
| @@ -374,6 +264,81 @@ fn writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> anyh | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn merge_into_lmdb<F>(sources: Vec<Reader<Mmap>>, mut f: F) -> anyhow::Result<()> | ||||
| where F: FnMut(&[u8], &[u8]) -> anyhow::Result<()> | ||||
| { | ||||
|     debug!("Merging {} MTBL stores...", sources.len()); | ||||
|     let before = Instant::now(); | ||||
|  | ||||
|     let mut builder = Merger::builder(merge); | ||||
|     builder.extend(sources); | ||||
|     let merger = builder.build(); | ||||
|  | ||||
|     let mut iter = merger.into_merge_iter()?; | ||||
|     while let Some(result) = iter.next() { | ||||
|         let (k, v) = result?; | ||||
|         (f)(&k, &v)?; | ||||
|     } | ||||
|  | ||||
|     debug!("MTBL stores merged in {:.02?}!", before.elapsed()); | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| fn index_csv( | ||||
|     mut rdr: csv::Reader<File>, | ||||
|     thread_index: usize, | ||||
|     num_threads: usize, | ||||
| ) -> anyhow::Result<Reader<Mmap>> | ||||
| { | ||||
|     debug!("{:?}: Indexing into an Indexed...", thread_index); | ||||
|  | ||||
|     let mut store = Store::new(); | ||||
|  | ||||
|     // Write the headers into a Vec of bytes and then into the store. | ||||
|     let headers = rdr.headers()?; | ||||
|     let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); | ||||
|     writer.write_byte_record(headers.as_byte_record())?; | ||||
|     let headers = writer.into_inner()?; | ||||
|     store.write_headers(&headers)?; | ||||
|  | ||||
|     let mut document_id: usize = 0; | ||||
|     let mut document = csv::StringRecord::new(); | ||||
|     while rdr.read_record(&mut document)? { | ||||
|         document_id = document_id + 1; | ||||
|  | ||||
|         // We skip documents that must not be indexed by this thread | ||||
|         if document_id % num_threads != thread_index { continue } | ||||
|  | ||||
|         let document_id = DocumentId::try_from(document_id).context("generated id is too big")?; | ||||
|  | ||||
|         if document_id % (ONE_MILLION as u32) == 0 { | ||||
|             debug!("We have seen {}m documents so far.", document_id / ONE_MILLION as u32); | ||||
|         } | ||||
|  | ||||
|         for (attr, content) in document.iter().enumerate().take(MAX_ATTRIBUTES) { | ||||
|             for (pos, word) in simple_alphanumeric_tokens(&content).enumerate().take(MAX_POSITION) { | ||||
|                 if !word.is_empty() && word.len() < LMDB_MAX_KEY_LENGTH { | ||||
|                     let word = word.cow_to_lowercase(); | ||||
|                     let position = (attr * MAX_POSITION + pos) as u32; | ||||
|                     store.insert_word_position(&word, position)?; | ||||
|                     store.insert_word_position_docid(&word, position, document_id)?; | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         // We write the document in the database. | ||||
|         let mut writer = csv::WriterBuilder::new().has_headers(false).from_writer(Vec::new()); | ||||
|         writer.write_byte_record(document.as_byte_record())?; | ||||
|         let document = writer.into_inner()?; | ||||
|         store.write_document(document_id, &document)?; | ||||
|     } | ||||
|  | ||||
|     let reader = store.finish()?; | ||||
|     debug!("{:?}: Store created!", thread_index); | ||||
|     Ok(reader) | ||||
| } | ||||
|  | ||||
| // TODO do that in the threads. | ||||
| fn compute_words_attributes_docids(wtxn: &mut heed::RwTxn, index: &Index) -> anyhow::Result<()> { | ||||
|     let before = Instant::now(); | ||||
|  | ||||
| @@ -441,7 +406,6 @@ fn main() -> anyhow::Result<()> { | ||||
|     let index = Index::new(&env)?; | ||||
|  | ||||
|     let num_threads = rayon::current_num_threads(); | ||||
|     let max_memory_usage = (opt.max_memory_usage / num_threads).max(50 * 1024 * 1024); // 50MB | ||||
|  | ||||
|     // We duplicate the file # jobs times. | ||||
|     let file = opt.csv_file.unwrap(); | ||||
| @@ -450,15 +414,13 @@ fn main() -> anyhow::Result<()> { | ||||
|     let stores: Vec<_> = csv_readers | ||||
|         .into_par_iter() | ||||
|         .enumerate() | ||||
|         .map(|(i, rdr)| index_csv(rdr, i, num_threads, max_memory_usage)) | ||||
|         .map(|(i, rdr)| index_csv(rdr, i, num_threads)) | ||||
|         .collect::<Result<_, _>>()?; | ||||
|  | ||||
|     let stores: Vec<_> = stores.into_iter().flatten().collect(); | ||||
|  | ||||
|     debug!("We are writing into LMDB..."); | ||||
|     let mut wtxn = env.write_txn()?; | ||||
|  | ||||
|     MtblKvStore::from_many(stores, |k, v| writer(&mut wtxn, &index, k, v))?; | ||||
|     merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; | ||||
|     compute_words_attributes_docids(&mut wtxn, &index)?; | ||||
|     let count = index.documents.len(&wtxn)?; | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user