mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-24 20:46:27 +00:00 
			
		
		
		
	Merge pull request #39 from meilisearch/speedup-documents-ids-merging
Speedup documents ids merging
This commit is contained in:
		| @@ -605,14 +605,14 @@ async fn main() -> anyhow::Result<()> { | |||||||
|             let index = index_cloned.clone(); |             let index = index_cloned.clone(); | ||||||
|             let rtxn = index.read_txn().unwrap(); |             let rtxn = index.read_txn().unwrap(); | ||||||
|  |  | ||||||
|             let users_ids_documents_ids = index.users_ids_documents_ids(&rtxn).unwrap(); |             let external_documents_ids = index.external_documents_ids(&rtxn).unwrap(); | ||||||
|             let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); |             let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); | ||||||
|             let displayed_fields = match index.displayed_fields(&rtxn).unwrap() { |             let displayed_fields = match index.displayed_fields(&rtxn).unwrap() { | ||||||
|                 Some(fields) => Cow::Borrowed(fields), |                 Some(fields) => Cow::Borrowed(fields), | ||||||
|                 None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()), |                 None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()), | ||||||
|             }; |             }; | ||||||
|  |  | ||||||
|             match users_ids_documents_ids.get(&id) { |             match external_documents_ids.get(&id) { | ||||||
|                 Some(document_id) => { |                 Some(document_id) => { | ||||||
|                     let document_id = document_id as u32; |                     let document_id = document_id as u32; | ||||||
|                     let (_, obkv) = index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap(); |                     let (_, obkv) = index.documents(&rtxn, Some(document_id)).unwrap().pop().unwrap(); | ||||||
|   | |||||||
							
								
								
									
										156
									
								
								src/external_documents_ids.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										156
									
								
								src/external_documents_ids.rs
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,156 @@ | |||||||
|  | use std::borrow::Cow; | ||||||
|  | use std::convert::TryInto; | ||||||
|  | use fst::{Streamer, IntoStreamer}; | ||||||
|  |  | ||||||
|  | pub struct ExternalDocumentsIds<'a> { | ||||||
|  |     pub(crate) hard: fst::Map<Cow<'a, [u8]>>, | ||||||
|  |     pub(crate) soft: fst::Map<Cow<'a, [u8]>>, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl<'a> ExternalDocumentsIds<'a> { | ||||||
|  |     pub fn new(hard: fst::Map<Cow<'a, [u8]>>, soft: fst::Map<Cow<'a, [u8]>>) -> ExternalDocumentsIds<'a> { | ||||||
|  |         ExternalDocumentsIds { hard, soft } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn into_static(self) -> ExternalDocumentsIds<'static> { | ||||||
|  |         ExternalDocumentsIds { | ||||||
|  |             hard: self.hard.map_data(|c| Cow::Owned(c.into_owned())).unwrap(), | ||||||
|  |             soft: self.soft.map_data(|c| Cow::Owned(c.into_owned())).unwrap(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn get<A: AsRef<[u8]>>(&self, external_id: A) -> Option<u32> { | ||||||
|  |         let external_id = external_id.as_ref(); | ||||||
|  |         match self.soft.get(external_id).or_else(|| self.hard.get(external_id)) { | ||||||
|  |             // u64 MAX means deleted in the soft fst map | ||||||
|  |             Some(id) if id != u64::MAX => Some(id.try_into().unwrap()), | ||||||
|  |             _otherwise => None | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn delete_ids<A: AsRef<[u8]>>(&mut self, other: fst::Set<A>) -> fst::Result<()> { | ||||||
|  |         let other = fst::Map::from(other.into_fst()); | ||||||
|  |         let union_op = self.soft.op().add(&other).r#union(); | ||||||
|  |  | ||||||
|  |         let mut iter = union_op.into_stream(); | ||||||
|  |         let mut new_soft_builder = fst::MapBuilder::memory(); | ||||||
|  |         while let Some((external_id, docids)) = iter.next() { | ||||||
|  |             if docids.iter().any(|v| v.index == 1) { | ||||||
|  |                 // If the `other` set returns a value here it means | ||||||
|  |                 // that it must be marked as deleted. | ||||||
|  |                 new_soft_builder.insert(external_id, u64::MAX)?; | ||||||
|  |             } else { | ||||||
|  |                 new_soft_builder.insert(external_id, docids[0].value)?; | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         drop(iter); | ||||||
|  |  | ||||||
|  |         // We save this new map as the new soft map. | ||||||
|  |         self.soft = new_soft_builder.into_map().map_data(Cow::Owned)?; | ||||||
|  |         self.merge_soft_into_hard() | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     pub fn insert_ids<A: AsRef<[u8]>>(&mut self, other: &fst::Map<A>) -> fst::Result<()> { | ||||||
|  |         let union_op = self.soft.op().add(other).r#union(); | ||||||
|  |  | ||||||
|  |         let mut new_soft_builder = fst::MapBuilder::memory(); | ||||||
|  |         let mut iter = union_op.into_stream(); | ||||||
|  |         while let Some((external_id, docids)) = iter.next() { | ||||||
|  |             let id = docids.last().unwrap().value; | ||||||
|  |             new_soft_builder.insert(external_id, id)?; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         drop(iter); | ||||||
|  |  | ||||||
|  |         // We save the new map as the new soft map. | ||||||
|  |         self.soft = new_soft_builder.into_map().map_data(Cow::Owned)?; | ||||||
|  |         self.merge_soft_into_hard() | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     fn merge_soft_into_hard(&mut self) -> fst::Result<()> { | ||||||
|  |         if self.soft.len() >= self.hard.len() / 2 { | ||||||
|  |             let union_op = self.hard.op().add(&self.soft).r#union(); | ||||||
|  |  | ||||||
|  |             let mut iter = union_op.into_stream(); | ||||||
|  |             let mut new_hard_builder = fst::MapBuilder::memory(); | ||||||
|  |             while let Some((external_id, docids)) = iter.next() { | ||||||
|  |                 if docids.len() == 2 { | ||||||
|  |                     if docids[1].value != u64::MAX { | ||||||
|  |                         new_hard_builder.insert(external_id, docids[1].value)?; | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     new_hard_builder.insert(external_id, docids[0].value)?; | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |  | ||||||
|  |             drop(iter); | ||||||
|  |  | ||||||
|  |             self.hard = new_hard_builder.into_map().map_data(Cow::Owned)?; | ||||||
|  |             self.soft = fst::Map::default().map_data(Cow::Owned)?; | ||||||
|  |         } | ||||||
|  |  | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | impl Default for ExternalDocumentsIds<'static> { | ||||||
|  |     fn default() -> Self { | ||||||
|  |         ExternalDocumentsIds { | ||||||
|  |             hard: fst::Map::default().map_data(Cow::Owned).unwrap(), | ||||||
|  |             soft: fst::Map::default().map_data(Cow::Owned).unwrap(), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  |  | ||||||
|  | #[cfg(test)] | ||||||
|  | mod tests { | ||||||
|  |     use super::*; | ||||||
|  |  | ||||||
|  |     #[test] | ||||||
|  |     fn simple_insert_delete_ids() { | ||||||
|  |         let mut external_documents_ids = ExternalDocumentsIds::default(); | ||||||
|  |  | ||||||
|  |         let new_ids = fst::Map::from_iter(vec![("a", 1), ("b", 2), ("c", 3), ("d", 4)]).unwrap(); | ||||||
|  |         external_documents_ids.insert_ids(&new_ids).unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!(external_documents_ids.get("a"), Some(1)); | ||||||
|  |         assert_eq!(external_documents_ids.get("b"), Some(2)); | ||||||
|  |         assert_eq!(external_documents_ids.get("c"), Some(3)); | ||||||
|  |         assert_eq!(external_documents_ids.get("d"), Some(4)); | ||||||
|  |  | ||||||
|  |         let new_ids = fst::Map::from_iter(vec![("e", 5), ("f", 6), ("g", 7)]).unwrap(); | ||||||
|  |         external_documents_ids.insert_ids(&new_ids).unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!(external_documents_ids.get("a"), Some(1)); | ||||||
|  |         assert_eq!(external_documents_ids.get("b"), Some(2)); | ||||||
|  |         assert_eq!(external_documents_ids.get("c"), Some(3)); | ||||||
|  |         assert_eq!(external_documents_ids.get("d"), Some(4)); | ||||||
|  |         assert_eq!(external_documents_ids.get("e"), Some(5)); | ||||||
|  |         assert_eq!(external_documents_ids.get("f"), Some(6)); | ||||||
|  |         assert_eq!(external_documents_ids.get("g"), Some(7)); | ||||||
|  |  | ||||||
|  |         let del_ids = fst::Set::from_iter(vec!["a", "c", "f"]).unwrap(); | ||||||
|  |         external_documents_ids.delete_ids(del_ids).unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!(external_documents_ids.get("a"), None); | ||||||
|  |         assert_eq!(external_documents_ids.get("b"), Some(2)); | ||||||
|  |         assert_eq!(external_documents_ids.get("c"), None); | ||||||
|  |         assert_eq!(external_documents_ids.get("d"), Some(4)); | ||||||
|  |         assert_eq!(external_documents_ids.get("e"), Some(5)); | ||||||
|  |         assert_eq!(external_documents_ids.get("f"), None); | ||||||
|  |         assert_eq!(external_documents_ids.get("g"), Some(7)); | ||||||
|  |  | ||||||
|  |         let new_ids = fst::Map::from_iter(vec![("a", 5), ("b", 6), ("h", 8)]).unwrap(); | ||||||
|  |         external_documents_ids.insert_ids(&new_ids).unwrap(); | ||||||
|  |  | ||||||
|  |         assert_eq!(external_documents_ids.get("a"), Some(5)); | ||||||
|  |         assert_eq!(external_documents_ids.get("b"), Some(6)); | ||||||
|  |         assert_eq!(external_documents_ids.get("c"), None); | ||||||
|  |         assert_eq!(external_documents_ids.get("d"), Some(4)); | ||||||
|  |         assert_eq!(external_documents_ids.get("e"), Some(5)); | ||||||
|  |         assert_eq!(external_documents_ids.get("f"), None); | ||||||
|  |         assert_eq!(external_documents_ids.get("g"), Some(7)); | ||||||
|  |         assert_eq!(external_documents_ids.get("h"), Some(8)); | ||||||
|  |     } | ||||||
|  | } | ||||||
							
								
								
									
										43
									
								
								src/index.rs
									
									
									
									
									
								
							
							
						
						
									
										43
									
								
								src/index.rs
									
									
									
									
									
								
							| @@ -10,7 +10,7 @@ use roaring::RoaringBitmap; | |||||||
| use crate::facet::FacetType; | use crate::facet::FacetType; | ||||||
| use crate::fields_ids_map::FieldsIdsMap; | use crate::fields_ids_map::FieldsIdsMap; | ||||||
| use crate::Search; | use crate::Search; | ||||||
| use crate::{BEU32, DocumentId}; | use crate::{BEU32, DocumentId, ExternalDocumentsIds}; | ||||||
| use crate::{ | use crate::{ | ||||||
|     RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, ObkvCodec, |     RoaringBitmapCodec, BEU32StrCodec, StrStrU8Codec, ObkvCodec, | ||||||
|     BoRoaringBitmapCodec, CboRoaringBitmapCodec, |     BoRoaringBitmapCodec, CboRoaringBitmapCodec, | ||||||
| @@ -22,7 +22,8 @@ pub const FACETED_FIELDS_KEY: &str = "faceted-fields"; | |||||||
| pub const FIELDS_IDS_MAP_KEY: &str = "fields-ids-map"; | pub const FIELDS_IDS_MAP_KEY: &str = "fields-ids-map"; | ||||||
| pub const PRIMARY_KEY_KEY: &str = "primary-key"; | pub const PRIMARY_KEY_KEY: &str = "primary-key"; | ||||||
| pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields"; | pub const SEARCHABLE_FIELDS_KEY: &str = "searchable-fields"; | ||||||
| pub const USERS_IDS_DOCUMENTS_IDS_KEY: &str = "users-ids-documents-ids"; | pub const HARD_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "hard-external-documents-ids"; | ||||||
|  | pub const SOFT_EXTERNAL_DOCUMENTS_IDS_KEY: &str = "soft-external-documents-ids"; | ||||||
| pub const WORDS_FST_KEY: &str = "words-fst"; | pub const WORDS_FST_KEY: &str = "words-fst"; | ||||||
|  |  | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| @@ -119,21 +120,37 @@ impl Index { | |||||||
|         self.main.get::<_, Str, OwnedType<u8>>(rtxn, PRIMARY_KEY_KEY) |         self.main.get::<_, Str, OwnedType<u8>>(rtxn, PRIMARY_KEY_KEY) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /* users ids documents ids */ |     /* external documents ids */ | ||||||
|  |  | ||||||
|     /// Writes the users ids documents ids, a user id is a byte slice (i.e. `[u8]`) |     /// Writes the external documents ids and internal ids (i.e. `u32`). | ||||||
|     /// and refers to an internal id (i.e. `u32`). |     pub fn put_external_documents_ids<'a>( | ||||||
|     pub fn put_users_ids_documents_ids<A: AsRef<[u8]>>(&self, wtxn: &mut RwTxn, fst: &fst::Map<A>) -> heed::Result<()> { |         &self, | ||||||
|         self.main.put::<_, Str, ByteSlice>(wtxn, USERS_IDS_DOCUMENTS_IDS_KEY, fst.as_fst().as_bytes()) |         wtxn: &mut RwTxn, | ||||||
|  |         external_documents_ids: &ExternalDocumentsIds<'a>, | ||||||
|  |     ) -> heed::Result<()> | ||||||
|  |     { | ||||||
|  |         let ExternalDocumentsIds { hard, soft } = external_documents_ids; | ||||||
|  |         let hard = hard.as_fst().as_bytes(); | ||||||
|  |         let soft = soft.as_fst().as_bytes(); | ||||||
|  |         self.main.put::<_, Str, ByteSlice>(wtxn, HARD_EXTERNAL_DOCUMENTS_IDS_KEY, hard)?; | ||||||
|  |         self.main.put::<_, Str, ByteSlice>(wtxn, SOFT_EXTERNAL_DOCUMENTS_IDS_KEY, soft)?; | ||||||
|  |         Ok(()) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /// Returns the user ids documents ids map which associate the user ids (i.e. `[u8]`) |     /// Returns the external documents ids map which associate the external ids | ||||||
|     /// with the internal ids (i.e. `u32`). |     /// with the internal ids (i.e. `u32`). | ||||||
|     pub fn users_ids_documents_ids<'t>(&self, rtxn: &'t RoTxn) -> anyhow::Result<fst::Map<Cow<'t, [u8]>>> { |     pub fn external_documents_ids<'t>(&self, rtxn: &'t RoTxn) -> anyhow::Result<ExternalDocumentsIds<'t>> { | ||||||
|         match self.main.get::<_, Str, ByteSlice>(rtxn, USERS_IDS_DOCUMENTS_IDS_KEY)? { |         let hard = self.main.get::<_, Str, ByteSlice>(rtxn, HARD_EXTERNAL_DOCUMENTS_IDS_KEY)?; | ||||||
|             Some(bytes) => Ok(fst::Map::new(bytes)?.map_data(Cow::Borrowed)?), |         let soft = self.main.get::<_, Str, ByteSlice>(rtxn, SOFT_EXTERNAL_DOCUMENTS_IDS_KEY)?; | ||||||
|             None => Ok(fst::Map::default().map_data(Cow::Owned)?), |         let hard = match hard { | ||||||
|         } |             Some(hard) => fst::Map::new(hard)?.map_data(Cow::Borrowed)?, | ||||||
|  |             None => fst::Map::default().map_data(Cow::Owned)?, | ||||||
|  |         }; | ||||||
|  |         let soft = match soft { | ||||||
|  |             Some(soft) => fst::Map::new(soft)?.map_data(Cow::Borrowed)?, | ||||||
|  |             None => fst::Map::default().map_data(Cow::Owned)?, | ||||||
|  |         }; | ||||||
|  |         Ok(ExternalDocumentsIds::new(hard, soft)) | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     /* fields ids map */ |     /* fields ids map */ | ||||||
|   | |||||||
| @@ -1,4 +1,5 @@ | |||||||
| mod criterion; | mod criterion; | ||||||
|  | mod external_documents_ids; | ||||||
| mod fields_ids_map; | mod fields_ids_map; | ||||||
| mod index; | mod index; | ||||||
| mod mdfs; | mod mdfs; | ||||||
| @@ -20,6 +21,7 @@ use fxhash::{FxHasher32, FxHasher64}; | |||||||
| use serde_json::{Map, Value}; | use serde_json::{Map, Value}; | ||||||
|  |  | ||||||
| pub use self::criterion::{Criterion, default_criteria}; | pub use self::criterion::{Criterion, default_criteria}; | ||||||
|  | pub use self::external_documents_ids::ExternalDocumentsIds; | ||||||
| pub use self::fields_ids_map::FieldsIdsMap; | pub use self::fields_ids_map::FieldsIdsMap; | ||||||
| pub use self::index::Index; | pub use self::index::Index; | ||||||
| pub use self::search::{Search, SearchResult}; | pub use self::search::{Search, SearchResult}; | ||||||
|   | |||||||
| @@ -13,6 +13,7 @@ const WORD_DOCIDS_DB_NAME: &str = "word-docids"; | |||||||
| const DOCID_WORD_POSITIONS_DB_NAME: &str = "docid-word-positions"; | const DOCID_WORD_POSITIONS_DB_NAME: &str = "docid-word-positions"; | ||||||
| const WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-pair-proximity-docids"; | const WORD_PAIR_PROXIMITY_DOCIDS_DB_NAME: &str = "word-pair-proximity-docids"; | ||||||
| const DOCUMENTS_DB_NAME: &str = "documents"; | const DOCUMENTS_DB_NAME: &str = "documents"; | ||||||
|  | const USERS_IDS_DOCUMENTS_IDS: &[u8] = b"users-ids-documents-ids"; | ||||||
|  |  | ||||||
| const ALL_DATABASE_NAMES: &[&str] = &[ | const ALL_DATABASE_NAMES: &[&str] = &[ | ||||||
|     MAIN_DB_NAME, |     MAIN_DB_NAME, | ||||||
| @@ -137,6 +138,10 @@ enum Command { | |||||||
|         #[structopt(short, long, default_value = "words.fst")] |         #[structopt(short, long, default_value = "words.fst")] | ||||||
|         output: PathBuf, |         output: PathBuf, | ||||||
|     }, |     }, | ||||||
|  |  | ||||||
|  |     /// A command that patches the old external ids | ||||||
|  |     /// into the new external ids format. | ||||||
|  |     PatchToNewExternalIds, | ||||||
| } | } | ||||||
|  |  | ||||||
| pub fn run(opt: Opt) -> anyhow::Result<()> { | pub fn run(opt: Opt) -> anyhow::Result<()> { | ||||||
| @@ -171,9 +176,32 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { | |||||||
|             word_pair_proximities_docids(&index, &rtxn, !full_display, word1, word2) |             word_pair_proximities_docids(&index, &rtxn, !full_display, word1, word2) | ||||||
|         }, |         }, | ||||||
|         ExportWordsFst { output } => export_words_fst(&index, &rtxn, output), |         ExportWordsFst { output } => export_words_fst(&index, &rtxn, output), | ||||||
|  |         PatchToNewExternalIds => { | ||||||
|  |             drop(rtxn); | ||||||
|  |             let mut wtxn = index.write_txn()?; | ||||||
|  |             let result = patch_to_new_external_ids(&index, &mut wtxn); | ||||||
|  |             wtxn.commit()?; | ||||||
|  |             result | ||||||
|  |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
|  | fn patch_to_new_external_ids(index: &Index, wtxn: &mut heed::RwTxn) -> anyhow::Result<()> { | ||||||
|  |     use heed::types::ByteSlice; | ||||||
|  |  | ||||||
|  |     if let Some(documents_ids) = index.main.get::<_, ByteSlice, ByteSlice>(wtxn, USERS_IDS_DOCUMENTS_IDS)? { | ||||||
|  |         let documents_ids = documents_ids.to_owned(); | ||||||
|  |         index.main.put::<_, ByteSlice, ByteSlice>( | ||||||
|  |             wtxn, | ||||||
|  |             crate::index::HARD_EXTERNAL_DOCUMENTS_IDS_KEY.as_bytes(), | ||||||
|  |             &documents_ids, | ||||||
|  |         )?; | ||||||
|  |         index.main.delete::<_, ByteSlice>(wtxn, USERS_IDS_DOCUMENTS_IDS)?; | ||||||
|  |     } | ||||||
|  |  | ||||||
|  |     Ok(()) | ||||||
|  | } | ||||||
|  |  | ||||||
| fn most_common_words(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyhow::Result<()> { | fn most_common_words(index: &Index, rtxn: &heed::RoTxn, limit: usize) -> anyhow::Result<()> { | ||||||
|     use std::collections::BinaryHeap; |     use std::collections::BinaryHeap; | ||||||
|     use std::cmp::Reverse; |     use std::cmp::Reverse; | ||||||
|   | |||||||
| @@ -1,5 +1,5 @@ | |||||||
| use roaring::RoaringBitmap; | use roaring::RoaringBitmap; | ||||||
| use crate::Index; | use crate::{ExternalDocumentsIds, Index}; | ||||||
|  |  | ||||||
| pub struct ClearDocuments<'t, 'u, 'i> { | pub struct ClearDocuments<'t, 'u, 'i> { | ||||||
|     wtxn: &'t mut heed::RwTxn<'i, 'u>, |     wtxn: &'t mut heed::RwTxn<'i, 'u>, | ||||||
| @@ -27,7 +27,7 @@ impl<'t, 'u, 'i> ClearDocuments<'t, 'u, 'i> { | |||||||
|  |  | ||||||
|         // We clean some of the main engine datastructures. |         // We clean some of the main engine datastructures. | ||||||
|         self.index.put_words_fst(self.wtxn, &fst::Set::default())?; |         self.index.put_words_fst(self.wtxn, &fst::Set::default())?; | ||||||
|         self.index.put_users_ids_documents_ids(self.wtxn, &fst::Map::default())?; |         self.index.put_external_documents_ids(self.wtxn, &ExternalDocumentsIds::default())?; | ||||||
|         self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?; |         self.index.put_documents_ids(self.wtxn, &RoaringBitmap::default())?; | ||||||
|  |  | ||||||
|         // Clear the other databases. |         // Clear the other databases. | ||||||
|   | |||||||
| @@ -1,16 +1,13 @@ | |||||||
| use std::borrow::Cow; | use fst::IntoStreamer; | ||||||
| use std::convert::TryFrom; |  | ||||||
|  |  | ||||||
| use fst::{IntoStreamer, Streamer}; |  | ||||||
| use roaring::RoaringBitmap; | use roaring::RoaringBitmap; | ||||||
|  |  | ||||||
| use crate::{Index, BEU32, SmallString32}; | use crate::{Index, BEU32, SmallString32, ExternalDocumentsIds}; | ||||||
| use super::ClearDocuments; | use super::ClearDocuments; | ||||||
|  |  | ||||||
| pub struct DeleteDocuments<'t, 'u, 'i> { | pub struct DeleteDocuments<'t, 'u, 'i> { | ||||||
|     wtxn: &'t mut heed::RwTxn<'i, 'u>, |     wtxn: &'t mut heed::RwTxn<'i, 'u>, | ||||||
|     index: &'i Index, |     index: &'i Index, | ||||||
|     users_ids_documents_ids: fst::Map<Vec<u8>>, |     external_documents_ids: ExternalDocumentsIds<'static>, | ||||||
|     documents_ids: RoaringBitmap, |     documents_ids: RoaringBitmap, | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -20,14 +17,14 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|         index: &'i Index, |         index: &'i Index, | ||||||
|     ) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> |     ) -> anyhow::Result<DeleteDocuments<'t, 'u, 'i>> | ||||||
|     { |     { | ||||||
|         let users_ids_documents_ids = index |         let external_documents_ids = index | ||||||
|             .users_ids_documents_ids(wtxn)? |             .external_documents_ids(wtxn)? | ||||||
|             .map_data(Cow::into_owned)?; |             .into_static(); | ||||||
|  |  | ||||||
|         Ok(DeleteDocuments { |         Ok(DeleteDocuments { | ||||||
|             wtxn, |             wtxn, | ||||||
|             index, |             index, | ||||||
|             users_ids_documents_ids, |             external_documents_ids, | ||||||
|             documents_ids: RoaringBitmap::new(), |             documents_ids: RoaringBitmap::new(), | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| @@ -40,8 +37,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|         self.documents_ids.union_with(docids); |         self.documents_ids.union_with(docids); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     pub fn delete_user_id(&mut self, user_id: &str) -> Option<u32> { |     pub fn delete_external_id(&mut self, external_id: &str) -> Option<u32> { | ||||||
|         let docid = self.users_ids_documents_ids.get(user_id).map(|id| u32::try_from(id).unwrap())?; |         let docid = self.external_documents_ids.get(external_id)?; | ||||||
|         self.delete_document(docid); |         self.delete_document(docid); | ||||||
|         Some(docid) |         Some(docid) | ||||||
|     } |     } | ||||||
| @@ -80,9 +77,9 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|             documents, |             documents, | ||||||
|         } = self.index; |         } = self.index; | ||||||
|  |  | ||||||
|         // Retrieve the words and the users ids contained in the documents. |         // Retrieve the words and the external documents ids contained in the documents. | ||||||
|         let mut words = Vec::new(); |         let mut words = Vec::new(); | ||||||
|         let mut users_ids = Vec::new(); |         let mut external_ids = Vec::new(); | ||||||
|         for docid in &self.documents_ids { |         for docid in &self.documents_ids { | ||||||
|             // We create an iterator to be able to get the content and delete the document |             // We create an iterator to be able to get the content and delete the document | ||||||
|             // content itself. It's faster to acquire a cursor to get and delete, |             // content itself. It's faster to acquire a cursor to get and delete, | ||||||
| @@ -91,8 +88,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|             let mut iter = documents.range_mut(self.wtxn, &(key..=key))?; |             let mut iter = documents.range_mut(self.wtxn, &(key..=key))?; | ||||||
|             if let Some((_key, obkv)) = iter.next().transpose()? { |             if let Some((_key, obkv)) = iter.next().transpose()? { | ||||||
|                 if let Some(content) = obkv.get(id_field) { |                 if let Some(content) = obkv.get(id_field) { | ||||||
|                     let user_id: SmallString32 = serde_json::from_slice(content).unwrap(); |                     let external_id: SmallString32 = serde_json::from_slice(content).unwrap(); | ||||||
|                     users_ids.push(user_id); |                     external_ids.push(external_id); | ||||||
|                 } |                 } | ||||||
|                 iter.del_current()?; |                 iter.del_current()?; | ||||||
|             } |             } | ||||||
| @@ -109,30 +106,18 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  |  | ||||||
|         // We create the FST map of the users ids that we must delete. |         // We create the FST map of the external ids that we must delete. | ||||||
|         users_ids.sort_unstable(); |         external_ids.sort_unstable(); | ||||||
|         let users_ids_to_delete = fst::Set::from_iter(users_ids.iter().map(AsRef::as_ref))?; |         let external_ids_to_delete = fst::Set::from_iter(external_ids.iter().map(AsRef::as_ref))?; | ||||||
|         let users_ids_to_delete = fst::Map::from(users_ids_to_delete.into_fst()); |  | ||||||
|  |  | ||||||
|         let new_users_ids_documents_ids = { |         // We acquire the current external documents ids map... | ||||||
|             // We acquire the current users ids documents ids map and create |         let mut new_external_documents_ids = self.index.external_documents_ids(self.wtxn)?; | ||||||
|             // a difference operation between the current and to-delete users ids. |         // ...and remove the to-delete external ids. | ||||||
|             let users_ids_documents_ids = self.index.users_ids_documents_ids(self.wtxn)?; |         new_external_documents_ids.delete_ids(external_ids_to_delete)?; | ||||||
|             let difference = users_ids_documents_ids.op().add(&users_ids_to_delete).difference(); |  | ||||||
|  |  | ||||||
|             // We stream the new users ids that does no more contains the to-delete users ids. |         // We write the new external ids into the main database. | ||||||
|             let mut iter = difference.into_stream(); |         let new_external_documents_ids = new_external_documents_ids.into_static(); | ||||||
|             let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); |         self.index.put_external_documents_ids(self.wtxn, &new_external_documents_ids)?; | ||||||
|             while let Some((userid, docids)) = iter.next() { |  | ||||||
|                 new_users_ids_documents_ids_builder.insert(userid, docids[0].value)?; |  | ||||||
|             } |  | ||||||
|  |  | ||||||
|             // We create an FST map from the above builder. |  | ||||||
|             new_users_ids_documents_ids_builder.into_map() |  | ||||||
|         }; |  | ||||||
|  |  | ||||||
|         // We write the new users ids into the main database. |  | ||||||
|         self.index.put_users_ids_documents_ids(self.wtxn, &new_users_ids_documents_ids)?; |  | ||||||
|  |  | ||||||
|         // Maybe we can improve the get performance of the words |         // Maybe we can improve the get performance of the words | ||||||
|         // if we sort the words first, keeping the LMDB pages in cache. |         // if we sort the words first, keeping the LMDB pages in cache. | ||||||
| @@ -169,7 +154,7 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> { | |||||||
|             let words_fst = self.index.words_fst(self.wtxn)?; |             let words_fst = self.index.words_fst(self.wtxn)?; | ||||||
|             let difference = words_fst.op().add(&words_to_delete).difference(); |             let difference = words_fst.op().add(&words_to_delete).difference(); | ||||||
|  |  | ||||||
|             // We stream the new users ids that does no more contains the to-delete users ids. |             // We stream the new external ids that does no more contains the to-delete external ids. | ||||||
|             let mut new_words_fst_builder = fst::SetBuilder::memory(); |             let mut new_words_fst_builder = fst::SetBuilder::memory(); | ||||||
|             new_words_fst_builder.extend_stream(difference.into_stream())?; |             new_words_fst_builder.extend_stream(difference.into_stream())?; | ||||||
|  |  | ||||||
|   | |||||||
| @@ -287,7 +287,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { | |||||||
|         let TransformOutput { |         let TransformOutput { | ||||||
|             primary_key, |             primary_key, | ||||||
|             fields_ids_map, |             fields_ids_map, | ||||||
|             users_ids_documents_ids, |             external_documents_ids, | ||||||
|             new_documents_ids, |             new_documents_ids, | ||||||
|             replaced_documents_ids, |             replaced_documents_ids, | ||||||
|             documents_count, |             documents_count, | ||||||
| @@ -472,8 +472,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { | |||||||
|         // We write the primary key field id into the main database |         // We write the primary key field id into the main database | ||||||
|         self.index.put_primary_key(self.wtxn, primary_key)?; |         self.index.put_primary_key(self.wtxn, primary_key)?; | ||||||
|  |  | ||||||
|         // We write the users_ids_documents_ids into the main database. |         // We write the external documents ids into the main database. | ||||||
|         self.index.put_users_ids_documents_ids(self.wtxn, &users_ids_documents_ids)?; |         self.index.put_external_documents_ids(self.wtxn, &external_documents_ids)?; | ||||||
|  |  | ||||||
|         // We merge the new documents ids with the existing ones. |         // We merge the new documents ids with the existing ones. | ||||||
|         documents_ids.union_with(&new_documents_ids); |         documents_ids.union_with(&new_documents_ids); | ||||||
|   | |||||||
| @@ -3,14 +3,15 @@ use std::convert::TryFrom; | |||||||
| use std::fs::File; | use std::fs::File; | ||||||
| use std::io::{Read, Seek, SeekFrom}; | use std::io::{Read, Seek, SeekFrom}; | ||||||
| use std::iter::Peekable; | use std::iter::Peekable; | ||||||
|  | use std::time::Instant; | ||||||
|  |  | ||||||
| use anyhow::{anyhow, Context}; | use anyhow::{anyhow, Context}; | ||||||
| use fst::{IntoStreamer, Streamer}; |  | ||||||
| use grenad::CompressionType; | use grenad::CompressionType; | ||||||
|  | use log::info; | ||||||
| use roaring::RoaringBitmap; | use roaring::RoaringBitmap; | ||||||
| use serde_json::{Map, Value}; | use serde_json::{Map, Value}; | ||||||
|  |  | ||||||
| use crate::{BEU32, MergeFn, Index, FieldsIdsMap}; | use crate::{BEU32, MergeFn, Index, FieldsIdsMap, ExternalDocumentsIds}; | ||||||
| use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; | use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; | ||||||
| use super::merge_function::merge_two_obkvs; | use super::merge_function::merge_two_obkvs; | ||||||
| use super::{create_writer, create_sorter, IndexDocumentsMethod}; | use super::{create_writer, create_sorter, IndexDocumentsMethod}; | ||||||
| @@ -18,14 +19,14 @@ use super::{create_writer, create_sorter, IndexDocumentsMethod}; | |||||||
| pub struct TransformOutput { | pub struct TransformOutput { | ||||||
|     pub primary_key: u8, |     pub primary_key: u8, | ||||||
|     pub fields_ids_map: FieldsIdsMap, |     pub fields_ids_map: FieldsIdsMap, | ||||||
|     pub users_ids_documents_ids: fst::Map<Vec<u8>>, |     pub external_documents_ids: ExternalDocumentsIds<'static>, | ||||||
|     pub new_documents_ids: RoaringBitmap, |     pub new_documents_ids: RoaringBitmap, | ||||||
|     pub replaced_documents_ids: RoaringBitmap, |     pub replaced_documents_ids: RoaringBitmap, | ||||||
|     pub documents_count: usize, |     pub documents_count: usize, | ||||||
|     pub documents_file: File, |     pub documents_file: File, | ||||||
| } | } | ||||||
|  |  | ||||||
| /// Extract the users ids, deduplicate and compute the new internal documents ids | /// Extract the external ids, deduplicate and compute the new internal documents ids | ||||||
| /// and fields ids, writing all the documents under their internal ids into a final file. | /// and fields ids, writing all the documents under their internal ids into a final file. | ||||||
| /// | /// | ||||||
| /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, | /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, | ||||||
| @@ -72,7 +73,7 @@ impl Transform<'_, '_> { | |||||||
|         F: Fn(UpdateIndexingStep) + Sync, |         F: Fn(UpdateIndexingStep) + Sync, | ||||||
|     { |     { | ||||||
|         let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; |         let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; | ||||||
|         let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); |         let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); | ||||||
|         let primary_key = self.index.primary_key(self.rtxn)?; |         let primary_key = self.index.primary_key(self.rtxn)?; | ||||||
|  |  | ||||||
|         // Deserialize the whole batch of documents in memory. |         // Deserialize the whole batch of documents in memory. | ||||||
| @@ -114,7 +115,7 @@ impl Transform<'_, '_> { | |||||||
|             return Ok(TransformOutput { |             return Ok(TransformOutput { | ||||||
|                 primary_key, |                 primary_key, | ||||||
|                 fields_ids_map, |                 fields_ids_map, | ||||||
|                 users_ids_documents_ids: fst::Map::default(), |                 external_documents_ids: ExternalDocumentsIds::default(), | ||||||
|                 new_documents_ids: RoaringBitmap::new(), |                 new_documents_ids: RoaringBitmap::new(), | ||||||
|                 replaced_documents_ids: RoaringBitmap::new(), |                 replaced_documents_ids: RoaringBitmap::new(), | ||||||
|                 documents_count: 0, |                 documents_count: 0, | ||||||
| @@ -170,7 +171,7 @@ impl Transform<'_, '_> { | |||||||
|  |  | ||||||
|             // We retrieve the user id from the document based on the primary key name, |             // We retrieve the user id from the document based on the primary key name, | ||||||
|             // if the document id isn't present we generate a uuid. |             // if the document id isn't present we generate a uuid. | ||||||
|             let user_id = match document.get(&primary_key_name) { |             let external_id = match document.get(&primary_key_name) { | ||||||
|                 Some(value) => match value { |                 Some(value) => match value { | ||||||
|                     Value::String(string) => Cow::Borrowed(string.as_str()), |                     Value::String(string) => Cow::Borrowed(string.as_str()), | ||||||
|                     Value::Number(number) => Cow::Owned(number.to_string()), |                     Value::Number(number) => Cow::Owned(number.to_string()), | ||||||
| @@ -198,19 +199,19 @@ impl Transform<'_, '_> { | |||||||
|                 } |                 } | ||||||
|                 else if field_id == primary_key { |                 else if field_id == primary_key { | ||||||
|                     // We validate the document id [a-zA-Z0-9\-_]. |                     // We validate the document id [a-zA-Z0-9\-_]. | ||||||
|                     let user_id = match validate_document_id(&user_id) { |                     let external_id = match validate_document_id(&external_id) { | ||||||
|                         Some(valid) => valid, |                         Some(valid) => valid, | ||||||
|                         None => return Err(anyhow!("invalid document id: {:?}", user_id)), |                         None => return Err(anyhow!("invalid document id: {:?}", external_id)), | ||||||
|                     }; |                     }; | ||||||
|  |  | ||||||
|                     // We serialize the document id. |                     // We serialize the document id. | ||||||
|                     serde_json::to_writer(&mut json_buffer, &user_id)?; |                     serde_json::to_writer(&mut json_buffer, &external_id)?; | ||||||
|                     writer.insert(field_id, &json_buffer)?; |                     writer.insert(field_id, &json_buffer)?; | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             // We use the extracted/generated user id as the key for this document. |             // We use the extracted/generated user id as the key for this document. | ||||||
|             sorter.insert(user_id.as_bytes(), &obkv_buffer)?; |             sorter.insert(external_id.as_bytes(), &obkv_buffer)?; | ||||||
|             documents_count += 1; |             documents_count += 1; | ||||||
|         } |         } | ||||||
|  |  | ||||||
| @@ -225,7 +226,7 @@ impl Transform<'_, '_> { | |||||||
|             primary_key, |             primary_key, | ||||||
|             fields_ids_map, |             fields_ids_map, | ||||||
|             documents_count, |             documents_count, | ||||||
|             users_ids_documents_ids, |             external_documents_ids, | ||||||
|             progress_callback, |             progress_callback, | ||||||
|         ) |         ) | ||||||
|     } |     } | ||||||
| @@ -236,7 +237,7 @@ impl Transform<'_, '_> { | |||||||
|         F: Fn(UpdateIndexingStep) + Sync, |         F: Fn(UpdateIndexingStep) + Sync, | ||||||
|     { |     { | ||||||
|         let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; |         let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; | ||||||
|         let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn).unwrap(); |         let external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); | ||||||
|  |  | ||||||
|         let mut csv = csv::Reader::from_reader(reader); |         let mut csv = csv::Reader::from_reader(reader); | ||||||
|         let headers = csv.headers()?; |         let headers = csv.headers()?; | ||||||
| @@ -250,7 +251,7 @@ impl Transform<'_, '_> { | |||||||
|         } |         } | ||||||
|  |  | ||||||
|         // Extract the position of the primary key in the current headers, None if not found. |         // Extract the position of the primary key in the current headers, None if not found. | ||||||
|         let user_id_pos = match primary_key { |         let external_id_pos = match primary_key { | ||||||
|             Some(primary_key) => { |             Some(primary_key) => { | ||||||
|                 // Te primary key have is known so we must find the position in the CSV headers. |                 // Te primary key have is known so we must find the position in the CSV headers. | ||||||
|                 let name = fields_ids_map.name(primary_key).expect("found the primary key name"); |                 let name = fields_ids_map.name(primary_key).expect("found the primary key name"); | ||||||
| @@ -261,7 +262,7 @@ impl Transform<'_, '_> { | |||||||
|  |  | ||||||
|         // Returns the field id in the fileds ids map, create an "id" field |         // Returns the field id in the fileds ids map, create an "id" field | ||||||
|         // in case it is not in the current headers. |         // in case it is not in the current headers. | ||||||
|         let primary_key_field_id = match user_id_pos { |         let primary_key_field_id = match external_id_pos { | ||||||
|             Some(pos) => fields_ids_map.id(&headers[pos]).expect("found the primary key"), |             Some(pos) => fields_ids_map.id(&headers[pos]).expect("found the primary key"), | ||||||
|             None => { |             None => { | ||||||
|                 if !self.autogenerate_docids { |                 if !self.autogenerate_docids { | ||||||
| @@ -292,7 +293,7 @@ impl Transform<'_, '_> { | |||||||
|         ); |         ); | ||||||
|  |  | ||||||
|         // We write into the sorter to merge and deduplicate the documents |         // We write into the sorter to merge and deduplicate the documents | ||||||
|         // based on the users ids. |         // based on the external ids. | ||||||
|         let mut json_buffer = Vec::new(); |         let mut json_buffer = Vec::new(); | ||||||
|         let mut obkv_buffer = Vec::new(); |         let mut obkv_buffer = Vec::new(); | ||||||
|         let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; |         let mut uuid_buffer = [0; uuid::adapter::Hyphenated::LENGTH]; | ||||||
| @@ -310,13 +311,13 @@ impl Transform<'_, '_> { | |||||||
|             } |             } | ||||||
|  |  | ||||||
|             // We extract the user id if we know where it is or generate an UUID V4 otherwise. |             // We extract the user id if we know where it is or generate an UUID V4 otherwise. | ||||||
|             let user_id = match user_id_pos { |             let external_id = match external_id_pos { | ||||||
|                 Some(pos) => { |                 Some(pos) => { | ||||||
|                     let user_id = &record[pos]; |                     let external_id = &record[pos]; | ||||||
|                     // We validate the document id [a-zA-Z0-9\-_]. |                     // We validate the document id [a-zA-Z0-9\-_]. | ||||||
|                     match validate_document_id(&user_id) { |                     match validate_document_id(&external_id) { | ||||||
|                         Some(valid) => valid, |                         Some(valid) => valid, | ||||||
|                         None => return Err(anyhow!("invalid document id: {:?}", user_id)), |                         None => return Err(anyhow!("invalid document id: {:?}", external_id)), | ||||||
|                     } |                     } | ||||||
|                 }, |                 }, | ||||||
|                 None => uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer), |                 None => uuid::Uuid::new_v4().to_hyphenated().encode_lower(&mut uuid_buffer), | ||||||
| @@ -326,7 +327,7 @@ impl Transform<'_, '_> { | |||||||
|             // we return the generated document id instead of the record field. |             // we return the generated document id instead of the record field. | ||||||
|             let iter = fields_ids.iter() |             let iter = fields_ids.iter() | ||||||
|                 .map(|(fi, i)| { |                 .map(|(fi, i)| { | ||||||
|                     let field = if *fi == primary_key_field_id { user_id } else { &record[*i] }; |                     let field = if *fi == primary_key_field_id { external_id } else { &record[*i] }; | ||||||
|                     (fi, field) |                     (fi, field) | ||||||
|                 }); |                 }); | ||||||
|  |  | ||||||
| @@ -339,7 +340,7 @@ impl Transform<'_, '_> { | |||||||
|             } |             } | ||||||
|  |  | ||||||
|             // We use the extracted/generated user id as the key for this document. |             // We use the extracted/generated user id as the key for this document. | ||||||
|             sorter.insert(user_id, &obkv_buffer)?; |             sorter.insert(external_id, &obkv_buffer)?; | ||||||
|             documents_count += 1; |             documents_count += 1; | ||||||
|         } |         } | ||||||
|  |  | ||||||
| @@ -354,7 +355,7 @@ impl Transform<'_, '_> { | |||||||
|             primary_key_field_id, |             primary_key_field_id, | ||||||
|             fields_ids_map, |             fields_ids_map, | ||||||
|             documents_count, |             documents_count, | ||||||
|             users_ids_documents_ids, |             external_documents_ids, | ||||||
|             progress_callback, |             progress_callback, | ||||||
|         ) |         ) | ||||||
|     } |     } | ||||||
| @@ -368,7 +369,7 @@ impl Transform<'_, '_> { | |||||||
|         primary_key: u8, |         primary_key: u8, | ||||||
|         fields_ids_map: FieldsIdsMap, |         fields_ids_map: FieldsIdsMap, | ||||||
|         approximate_number_of_documents: usize, |         approximate_number_of_documents: usize, | ||||||
|         users_ids_documents_ids: fst::Map<Cow<'_, [u8]>>, |         mut external_documents_ids: ExternalDocumentsIds<'_>, | ||||||
|         progress_callback: F, |         progress_callback: F, | ||||||
|     ) -> anyhow::Result<TransformOutput> |     ) -> anyhow::Result<TransformOutput> | ||||||
|     where |     where | ||||||
| @@ -386,7 +387,7 @@ impl Transform<'_, '_> { | |||||||
|             self.max_nb_chunks, |             self.max_nb_chunks, | ||||||
|             self.max_memory, |             self.max_memory, | ||||||
|         ); |         ); | ||||||
|         let mut new_users_ids_documents_ids_builder = fst::MapBuilder::memory(); |         let mut new_external_documents_ids_builder = fst::MapBuilder::memory(); | ||||||
|         let mut replaced_documents_ids = RoaringBitmap::new(); |         let mut replaced_documents_ids = RoaringBitmap::new(); | ||||||
|         let mut new_documents_ids = RoaringBitmap::new(); |         let mut new_documents_ids = RoaringBitmap::new(); | ||||||
|         let mut obkv_buffer = Vec::new(); |         let mut obkv_buffer = Vec::new(); | ||||||
| @@ -394,7 +395,7 @@ impl Transform<'_, '_> { | |||||||
|         // While we write into final file we get or generate the internal documents ids. |         // While we write into final file we get or generate the internal documents ids. | ||||||
|         let mut documents_count = 0; |         let mut documents_count = 0; | ||||||
|         let mut iter = sorter.into_iter()?; |         let mut iter = sorter.into_iter()?; | ||||||
|         while let Some((user_id, update_obkv)) = iter.next()? { |         while let Some((external_id, update_obkv)) = iter.next()? { | ||||||
|  |  | ||||||
|             if self.log_every_n.map_or(false, |len| documents_count % len == 0) { |             if self.log_every_n.map_or(false, |len| documents_count % len == 0) { | ||||||
|                 progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { |                 progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { | ||||||
| @@ -403,9 +404,9 @@ impl Transform<'_, '_> { | |||||||
|                 }); |                 }); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|             let (docid, obkv) = match users_ids_documents_ids.get(user_id) { |             let (docid, obkv) = match external_documents_ids.get(external_id) { | ||||||
|                 Some(docid) => { |                 Some(docid) => { | ||||||
|                     // If we find the user id in the current users ids documents ids map |                     // If we find the user id in the current external documents ids map | ||||||
|                     // we use it and insert it in the list of replaced documents. |                     // we use it and insert it in the list of replaced documents. | ||||||
|                     let docid = u32::try_from(docid).expect("valid document id"); |                     let docid = u32::try_from(docid).expect("valid document id"); | ||||||
|                     replaced_documents_ids.insert(docid); |                     replaced_documents_ids.insert(docid); | ||||||
| @@ -425,11 +426,11 @@ impl Transform<'_, '_> { | |||||||
|                     } |                     } | ||||||
|                 }, |                 }, | ||||||
|                 None => { |                 None => { | ||||||
|                     // If this user id is new we add it to the users ids documents ids map |                     // If this user id is new we add it to the external documents ids map | ||||||
|                     // for new ids and into the list of new documents. |                     // for new ids and into the list of new documents. | ||||||
|                     let new_docid = available_documents_ids.next() |                     let new_docid = available_documents_ids.next() | ||||||
|                         .context("no more available documents ids")?; |                         .context("no more available documents ids")?; | ||||||
|                     new_users_ids_documents_ids_builder.insert(user_id, new_docid as u64)?; |                     new_external_documents_ids_builder.insert(external_id, new_docid as u64)?; | ||||||
|                     new_documents_ids.insert(new_docid); |                     new_documents_ids.insert(new_docid); | ||||||
|                     (new_docid, update_obkv) |                     (new_docid, update_obkv) | ||||||
|                 }, |                 }, | ||||||
| @@ -455,25 +456,17 @@ impl Transform<'_, '_> { | |||||||
|         let mut documents_file = writer.into_inner()?; |         let mut documents_file = writer.into_inner()?; | ||||||
|         documents_file.seek(SeekFrom::Start(0))?; |         documents_file.seek(SeekFrom::Start(0))?; | ||||||
|  |  | ||||||
|         // We create the union between the existing users ids documents ids with the new ones. |         let before_docids_merging = Instant::now(); | ||||||
|         let new_users_ids_documents_ids = new_users_ids_documents_ids_builder.into_map(); |         // We merge the new external ids with existing external documents ids. | ||||||
|         let union_ = fst::map::OpBuilder::new() |         let new_external_documents_ids = new_external_documents_ids_builder.into_map(); | ||||||
|             .add(&users_ids_documents_ids) |         external_documents_ids.insert_ids(&new_external_documents_ids)?; | ||||||
|             .add(&new_users_ids_documents_ids) |  | ||||||
|             .r#union(); |  | ||||||
|  |  | ||||||
|         // We stream and merge the new users ids documents ids map with the existing one. |         info!("Documents external merging took {:.02?}", before_docids_merging.elapsed()); | ||||||
|         let mut users_ids_documents_ids_builder = fst::MapBuilder::memory(); |  | ||||||
|         let mut iter = union_.into_stream(); |  | ||||||
|         while let Some((user_id, vals)) = iter.next() { |  | ||||||
|             assert_eq!(vals.len(), 1, "there must be exactly one document id"); |  | ||||||
|             users_ids_documents_ids_builder.insert(user_id, vals[0].value)?; |  | ||||||
|         } |  | ||||||
|  |  | ||||||
|         Ok(TransformOutput { |         Ok(TransformOutput { | ||||||
|             primary_key, |             primary_key, | ||||||
|             fields_ids_map, |             fields_ids_map, | ||||||
|             users_ids_documents_ids: users_ids_documents_ids_builder.into_map(), |             external_documents_ids: external_documents_ids.into_static(), | ||||||
|             new_documents_ids, |             new_documents_ids, | ||||||
|             replaced_documents_ids, |             replaced_documents_ids, | ||||||
|             documents_count, |             documents_count, | ||||||
| @@ -491,7 +484,7 @@ impl Transform<'_, '_> { | |||||||
|     ) -> anyhow::Result<TransformOutput> |     ) -> anyhow::Result<TransformOutput> | ||||||
|     { |     { | ||||||
|         let current_fields_ids_map = self.index.fields_ids_map(self.rtxn)?; |         let current_fields_ids_map = self.index.fields_ids_map(self.rtxn)?; | ||||||
|         let users_ids_documents_ids = self.index.users_ids_documents_ids(self.rtxn)?; |         let external_documents_ids = self.index.external_documents_ids(self.rtxn)?; | ||||||
|         let documents_ids = self.index.documents_ids(self.rtxn)?; |         let documents_ids = self.index.documents_ids(self.rtxn)?; | ||||||
|         let documents_count = documents_ids.len() as usize; |         let documents_count = documents_ids.len() as usize; | ||||||
|  |  | ||||||
| @@ -526,7 +519,7 @@ impl Transform<'_, '_> { | |||||||
|         Ok(TransformOutput { |         Ok(TransformOutput { | ||||||
|             primary_key, |             primary_key, | ||||||
|             fields_ids_map, |             fields_ids_map, | ||||||
|             users_ids_documents_ids: users_ids_documents_ids.map_data(Cow::into_owned)?, |             external_documents_ids: external_documents_ids.into_static(), | ||||||
|             new_documents_ids: documents_ids, |             new_documents_ids: documents_ids, | ||||||
|             replaced_documents_ids: RoaringBitmap::default(), |             replaced_documents_ids: RoaringBitmap::default(), | ||||||
|             documents_count, |             documents_count, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user