mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 05:26:27 +00:00 
			
		
		
		
	Put the documents into an MTBL database
This commit is contained in:
		
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @@ -1216,7 +1216,7 @@ checksum = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c" | |||||||
| [[package]] | [[package]] | ||||||
| name = "oxidized-mtbl" | name = "oxidized-mtbl" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
| source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=6b8a3a8#6b8a3a83a8b83bfdba38f7ea67bfa5868e668741" | source = "git+https://github.com/Kerollmops/oxidized-mtbl.git?rev=13294cc#13294ccd73c9d6f71645a3ed2852656f3c86d31d" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "byteorder", |  "byteorder", | ||||||
|  "crc32c", |  "crc32c", | ||||||
|   | |||||||
| @@ -1,3 +1,4 @@ | |||||||
|  | use std::convert::TryInto; | ||||||
| use std::convert::TryFrom; | use std::convert::TryFrom; | ||||||
| use std::fs::File; | use std::fs::File; | ||||||
| use std::io::{self, Read, Write}; | use std::io::{self, Read, Write}; | ||||||
| @@ -30,10 +31,10 @@ const MAX_ATTRIBUTES: usize = u32::max_value() as usize / MAX_POSITION; | |||||||
|  |  | ||||||
| const HEADERS_KEY: &[u8] = b"\0headers"; | const HEADERS_KEY: &[u8] = b"\0headers"; | ||||||
| const WORDS_FST_KEY: &[u8] = b"\x05words-fst"; | const WORDS_FST_KEY: &[u8] = b"\x05words-fst"; | ||||||
|  | const DOCUMENTS_KEY: &[u8] = b"\x06documents"; | ||||||
| const WORD_POSITIONS_BYTE: u8 = 1; | const WORD_POSITIONS_BYTE: u8 = 1; | ||||||
| const WORD_POSITION_DOCIDS_BYTE: u8 = 2; | const WORD_POSITION_DOCIDS_BYTE: u8 = 2; | ||||||
| const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; | const WORD_ATTRIBUTE_DOCIDS_BYTE: u8 = 3; | ||||||
| const DOCUMENT_BYTE: u8 = 4; |  | ||||||
|  |  | ||||||
| #[cfg(target_os = "linux")] | #[cfg(target_os = "linux")] | ||||||
| #[global_allocator] | #[global_allocator] | ||||||
| @@ -88,22 +89,23 @@ struct Store { | |||||||
|     word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>, |     word_position_docids: ArcCache<(SmallVec32<u8>, Position), RoaringBitmap>, | ||||||
|     word_attribute_docids: ArcCache<(SmallVec32<u8>, Attribute), RoaringBitmap>, |     word_attribute_docids: ArcCache<(SmallVec32<u8>, Attribute), RoaringBitmap>, | ||||||
|     sorter: Sorter<MergeFn>, |     sorter: Sorter<MergeFn>, | ||||||
|  |     documents_sorter: Sorter<MergeFn>, | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Store { | impl Store { | ||||||
|     fn new(arc_cache_size: Option<usize>, max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store { |     fn new(arc_cache_size: Option<usize>, max_nb_chunks: Option<usize>, max_memory: Option<usize>) -> Store { | ||||||
|         let mut builder = Sorter::builder(merge as MergeFn); |         let mut builder = Sorter::builder(merge as MergeFn); | ||||||
|  |  | ||||||
|         builder.chunk_compression_type(CompressionType::Snappy); |         builder.chunk_compression_type(CompressionType::Snappy); | ||||||
|  |  | ||||||
|         if let Some(nb_chunks) = max_nb_chunks { |         if let Some(nb_chunks) = max_nb_chunks { | ||||||
|             builder.max_nb_chunks(nb_chunks); |             builder.max_nb_chunks(nb_chunks); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         if let Some(memory) = max_memory { |         if let Some(memory) = max_memory { | ||||||
|             builder.max_memory(memory); |             builder.max_memory(memory); | ||||||
|         } |         } | ||||||
|  |  | ||||||
|  |         let mut documents_builder = Sorter::builder(docs_merge as MergeFn); | ||||||
|  |         documents_builder.chunk_compression_type(CompressionType::Snappy); | ||||||
|  |  | ||||||
|         let arc_cache_size = arc_cache_size.unwrap_or(65_535); |         let arc_cache_size = arc_cache_size.unwrap_or(65_535); | ||||||
|  |  | ||||||
|         Store { |         Store { | ||||||
| @@ -111,6 +113,7 @@ impl Store { | |||||||
|             word_position_docids: ArcCache::new(arc_cache_size), |             word_position_docids: ArcCache::new(arc_cache_size), | ||||||
|             word_attribute_docids: ArcCache::new(arc_cache_size), |             word_attribute_docids: ArcCache::new(arc_cache_size), | ||||||
|             sorter: builder.build(), |             sorter: builder.build(), | ||||||
|  |             documents_sorter: documents_builder.build(), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -144,13 +147,7 @@ impl Store { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { |     pub fn write_document(&mut self, id: DocumentId, content: &[u8]) -> anyhow::Result<()> { | ||||||
|         let id =  id.to_be_bytes(); |         Ok(self.documents_sorter.insert(id.to_be_bytes(), content)?) | ||||||
|         let mut key = Vec::with_capacity(1 + id.len()); |  | ||||||
|  |  | ||||||
|         key.push(DOCUMENT_BYTE); |  | ||||||
|         key.extend_from_slice(&id); |  | ||||||
|  |  | ||||||
|         Ok(self.sorter.insert(&key, content)?) |  | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()> |     fn write_word_positions<I>(sorter: &mut Sorter<MergeFn>, iter: I) -> anyhow::Result<()> | ||||||
| @@ -245,6 +242,12 @@ impl Store { | |||||||
|         let fst = builder.into_set(); |         let fst = builder.into_set(); | ||||||
|         wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; |         wtr.insert(WORDS_FST_KEY, fst.as_fst().as_bytes())?; | ||||||
|  |  | ||||||
|  |         let mut docs_wtr = tempfile::tempfile().map(Writer::new)?; | ||||||
|  |         self.documents_sorter.write_into(&mut docs_wtr)?; | ||||||
|  |         let docs_file = docs_wtr.into_inner()?; | ||||||
|  |         let docs_mmap = unsafe { Mmap::map(&docs_file)? }; | ||||||
|  |         wtr.insert(DOCUMENTS_KEY, docs_mmap)?; | ||||||
|  |  | ||||||
|         let file = wtr.into_inner()?; |         let file = wtr.into_inner()?; | ||||||
|         let mmap = unsafe { Mmap::map(&file)? }; |         let mmap = unsafe { Mmap::map(&file)? }; | ||||||
|         let reader = Reader::new(mmap)?; |         let reader = Reader::new(mmap)?; | ||||||
| @@ -253,6 +256,12 @@ impl Store { | |||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn docs_merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | ||||||
|  |     let key = key.try_into().unwrap(); | ||||||
|  |     let id = u32::from_be_bytes(key); | ||||||
|  |     panic!("documents must not conflict ({} with {} values)!", id, values.len()) | ||||||
|  | } | ||||||
|  |  | ||||||
| fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | ||||||
|     match key { |     match key { | ||||||
|         WORDS_FST_KEY => { |         WORDS_FST_KEY => { | ||||||
| @@ -271,6 +280,20 @@ fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | |||||||
|             assert!(values.windows(2).all(|vs| vs[0] == vs[1])); |             assert!(values.windows(2).all(|vs| vs[0] == vs[1])); | ||||||
|             Ok(values[0].to_vec()) |             Ok(values[0].to_vec()) | ||||||
|         }, |         }, | ||||||
|  |         DOCUMENTS_KEY => { | ||||||
|  |             let sources: Vec<_> = values.iter().map(Reader::new).collect::<Result<_, _>>().unwrap(); | ||||||
|  |  | ||||||
|  |             let mut builder = Merger::builder(docs_merge); | ||||||
|  |             builder.extend(sources); | ||||||
|  |             let merger = builder.build(); | ||||||
|  |  | ||||||
|  |             let mut builder = Writer::builder(); | ||||||
|  |             builder.compression_type(CompressionType::Snappy); | ||||||
|  |  | ||||||
|  |             let mut wtr = builder.memory(); | ||||||
|  |             merger.write_into(&mut wtr).unwrap(); | ||||||
|  |             Ok(wtr.into_inner().unwrap()) | ||||||
|  |         }, | ||||||
|         key => match key[0] { |         key => match key[0] { | ||||||
|               WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { |               WORD_POSITIONS_BYTE | WORD_POSITION_DOCIDS_BYTE | WORD_ATTRIBUTE_DOCIDS_BYTE => { | ||||||
|                 let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); |                 let mut first = RoaringBitmap::deserialize_from(values[0].as_slice()).unwrap(); | ||||||
| @@ -284,10 +307,6 @@ fn merge(key: &[u8], values: &[Vec<u8>]) -> Result<Vec<u8>, ()> { | |||||||
|                 first.serialize_into(&mut vec).unwrap(); |                 first.serialize_into(&mut vec).unwrap(); | ||||||
|                 Ok(vec) |                 Ok(vec) | ||||||
|             }, |             }, | ||||||
|             DOCUMENT_BYTE => { |  | ||||||
|                 assert!(values.windows(2).all(|vs| vs[0] == vs[1])); |  | ||||||
|                 Ok(values[0].to_vec()) |  | ||||||
|             }, |  | ||||||
|             otherwise => panic!("wut {:?}", otherwise), |             otherwise => panic!("wut {:?}", otherwise), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| @@ -304,6 +323,10 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> | |||||||
|         // Write the headers |         // Write the headers | ||||||
|         index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; |         index.main.put::<_, Str, ByteSlice>(wtxn, "headers", val)?; | ||||||
|     } |     } | ||||||
|  |     else if key == DOCUMENTS_KEY { | ||||||
|  |         // Write the documents | ||||||
|  |         index.main.put::<_, Str, ByteSlice>(wtxn, "documents", val)?; | ||||||
|  |     } | ||||||
|     else if key.starts_with(&[WORD_POSITIONS_BYTE]) { |     else if key.starts_with(&[WORD_POSITIONS_BYTE]) { | ||||||
|         // Write the postings lists |         // Write the postings lists | ||||||
|         index.word_positions.as_polymorph() |         index.word_positions.as_polymorph() | ||||||
| @@ -319,11 +342,6 @@ fn lmdb_writer(wtxn: &mut heed::RwTxn, index: &Index, key: &[u8], val: &[u8]) -> | |||||||
|         index.word_attribute_docids.as_polymorph() |         index.word_attribute_docids.as_polymorph() | ||||||
|             .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; |             .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; | ||||||
|     } |     } | ||||||
|     else if key.starts_with(&[DOCUMENT_BYTE]) { |  | ||||||
|         // Write the documents |  | ||||||
|         index.documents.as_polymorph() |  | ||||||
|             .put::<_, ByteSlice, ByteSlice>(wtxn, &key[1..], val)?; |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     Ok(()) |     Ok(()) | ||||||
| } | } | ||||||
| @@ -357,7 +375,7 @@ fn index_csv( | |||||||
|     max_memory: Option<usize>, |     max_memory: Option<usize>, | ||||||
| ) -> anyhow::Result<Reader<Mmap>> | ) -> anyhow::Result<Reader<Mmap>> | ||||||
| { | { | ||||||
|     debug!("{:?}: Indexing into an Indexed...", thread_index); |     debug!("{:?}: Indexing into a Store...", thread_index); | ||||||
|  |  | ||||||
|     let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory); |     let mut store = Store::new(arc_cache_size, max_nb_chunks, max_memory); | ||||||
|  |  | ||||||
| @@ -480,7 +498,7 @@ fn main() -> anyhow::Result<()> { | |||||||
|     let mut wtxn = env.write_txn()?; |     let mut wtxn = env.write_txn()?; | ||||||
|  |  | ||||||
|     merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; |     merge_into_lmdb(stores, |k, v| lmdb_writer(&mut wtxn, &index, k, v))?; | ||||||
|     let count = index.documents.len(&wtxn)?; |     let count = index.documents(&wtxn)?.unwrap().metadata().count_entries; | ||||||
|  |  | ||||||
|     wtxn.commit()?; |     wtxn.commit()?; | ||||||
|     debug!("Wrote {} documents into LMDB", count); |     debug!("Wrote {} documents into LMDB", count); | ||||||
|   | |||||||
| @@ -5,7 +5,7 @@ use std::time::Instant; | |||||||
|  |  | ||||||
| use heed::EnvOpenOptions; | use heed::EnvOpenOptions; | ||||||
| use log::debug; | use log::debug; | ||||||
| use milli::{Index, BEU32}; | use milli::Index; | ||||||
| use structopt::StructOpt; | use structopt::StructOpt; | ||||||
|  |  | ||||||
| #[cfg(target_os = "linux")] | #[cfg(target_os = "linux")] | ||||||
| @@ -67,9 +67,11 @@ fn main() -> anyhow::Result<()> { | |||||||
|         let mut stdout = io::stdout(); |         let mut stdout = io::stdout(); | ||||||
|         stdout.write_all(&headers)?; |         stdout.write_all(&headers)?; | ||||||
|  |  | ||||||
|  |         let documents = index.documents(&rtxn)?.unwrap(); | ||||||
|         for id in &documents_ids { |         for id in &documents_ids { | ||||||
|             if let Some(content) = index.documents.get(&rtxn, &BEU32::new(*id))? { |             let id_bytes = id.to_be_bytes(); | ||||||
|                 stdout.write_all(&content)?; |             if let Some(content) = documents.clone().get(&id_bytes)? { | ||||||
|  |                 stdout.write_all(content.as_ref())?; | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -13,7 +13,7 @@ use slice_group_by::StrGroupBy; | |||||||
| use structopt::StructOpt; | use structopt::StructOpt; | ||||||
| use warp::{Filter, http::Response}; | use warp::{Filter, http::Response}; | ||||||
|  |  | ||||||
| use milli::{BEU32, Index}; | use milli::Index; | ||||||
|  |  | ||||||
| #[cfg(target_os = "linux")] | #[cfg(target_os = "linux")] | ||||||
| #[global_allocator] | #[global_allocator] | ||||||
| @@ -87,7 +87,7 @@ async fn main() -> anyhow::Result<()> { | |||||||
|     // the disk file size and the number of documents in the database. |     // the disk file size and the number of documents in the database. | ||||||
|     let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); |     let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string(); | ||||||
|     let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; |     let db_size = File::open(opt.database.join("data.mdb"))?.metadata()?.len() as usize; | ||||||
|     let docs_count = env.read_txn().and_then(|r| index.documents.len(&r))?; |     let docs_count = env.read_txn().and_then(|r| Ok(index.documents(&r).unwrap().unwrap().metadata().count_entries))?; | ||||||
|  |  | ||||||
|     // We run and wait on the HTTP server |     // We run and wait on the HTTP server | ||||||
|  |  | ||||||
| @@ -98,7 +98,7 @@ async fn main() -> anyhow::Result<()> { | |||||||
|             IndexTemplate { |             IndexTemplate { | ||||||
|                 db_name: db_name.clone(), |                 db_name: db_name.clone(), | ||||||
|                 db_size, |                 db_size, | ||||||
|                 docs_count, |                 docs_count: docs_count as usize, | ||||||
|             } |             } | ||||||
|         }); |         }); | ||||||
|  |  | ||||||
| @@ -185,11 +185,13 @@ async fn main() -> anyhow::Result<()> { | |||||||
|             if let Some(headers) = index.headers(&rtxn).unwrap() { |             if let Some(headers) = index.headers(&rtxn).unwrap() { | ||||||
|                 // We write the headers |                 // We write the headers | ||||||
|                 body.extend_from_slice(headers); |                 body.extend_from_slice(headers); | ||||||
|  |                 let documents = index.documents(&rtxn).unwrap().unwrap(); | ||||||
|  |  | ||||||
|                 for id in documents_ids { |                 for id in documents_ids { | ||||||
|                     let content = index.documents.get(&rtxn, &BEU32::new(id)).unwrap(); |                     let id_bytes = id.to_be_bytes(); | ||||||
|  |                     let content = documents.clone().get(&id_bytes).unwrap(); | ||||||
|                     let content = content.expect(&format!("could not find document {}", id)); |                     let content = content.expect(&format!("could not find document {}", id)); | ||||||
|                     let content = std::str::from_utf8(content).unwrap(); |                     let content = std::str::from_utf8(content.as_ref()).unwrap(); | ||||||
|  |  | ||||||
|                     let content = if disable_highlighting { |                     let content = if disable_highlighting { | ||||||
|                         Cow::from(content) |                         Cow::from(content) | ||||||
|   | |||||||
							
								
								
									
										11
									
								
								src/lib.rs
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								src/lib.rs
									
									
									
									
									
								
							| @@ -16,6 +16,7 @@ use heed::{PolyDatabase, Database}; | |||||||
| use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder; | use levenshtein_automata::LevenshteinAutomatonBuilder as LevBuilder; | ||||||
| use log::debug; | use log::debug; | ||||||
| use once_cell::sync::Lazy; | use once_cell::sync::Lazy; | ||||||
|  | use oxidized_mtbl::Reader; | ||||||
| use roaring::RoaringBitmap; | use roaring::RoaringBitmap; | ||||||
|  |  | ||||||
| use self::best_proximity::BestProximity; | use self::best_proximity::BestProximity; | ||||||
| @@ -49,8 +50,6 @@ pub struct Index { | |||||||
|     pub prefix_word_position_docids: Database<ByteSlice, RoaringBitmapCodec>, |     pub prefix_word_position_docids: Database<ByteSlice, RoaringBitmapCodec>, | ||||||
|     /// Maps a word and an attribute (u32) to all the documents ids that it appears in. |     /// Maps a word and an attribute (u32) to all the documents ids that it appears in. | ||||||
|     pub word_attribute_docids: Database<ByteSlice, RoaringBitmapCodec>, |     pub word_attribute_docids: Database<ByteSlice, RoaringBitmapCodec>, | ||||||
|     /// Maps an internal document to the content of the document in CSV. |  | ||||||
|     pub documents: Database<OwnedType<BEU32>, ByteSlice>, |  | ||||||
| } | } | ||||||
|  |  | ||||||
| impl Index { | impl Index { | ||||||
| @@ -62,7 +61,6 @@ impl Index { | |||||||
|             word_position_docids: env.create_database(Some("word-position-docids"))?, |             word_position_docids: env.create_database(Some("word-position-docids"))?, | ||||||
|             prefix_word_position_docids: env.create_database(Some("prefix-word-position-docids"))?, |             prefix_word_position_docids: env.create_database(Some("prefix-word-position-docids"))?, | ||||||
|             word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, |             word_attribute_docids: env.create_database(Some("word-attribute-docids"))?, | ||||||
|             documents: env.create_database(Some("documents"))?, |  | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
|  |  | ||||||
| @@ -74,6 +72,13 @@ impl Index { | |||||||
|         self.main.get::<_, Str, ByteSlice>(rtxn, "headers") |         self.main.get::<_, Str, ByteSlice>(rtxn, "headers") | ||||||
|     } |     } | ||||||
|  |  | ||||||
|  |     pub fn documents<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<Reader<&'t [u8]>>> { | ||||||
|  |         match self.main.get::<_, Str, ByteSlice>(rtxn, "documents")? { | ||||||
|  |             Some(bytes) => Ok(Some(Reader::new(bytes)?)), | ||||||
|  |             None => Ok(None), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|     pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> { |     pub fn number_of_attributes<'t>(&self, rtxn: &'t heed::RoTxn) -> anyhow::Result<Option<usize>> { | ||||||
|         match self.headers(rtxn)? { |         match self.headers(rtxn)? { | ||||||
|             Some(headers) => { |             Some(headers) => { | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user