First compiling version with compressed documents iterators

This commit is contained in:
Clément Renault
2024-07-02 11:08:10 +02:00
parent 2f0567fad1
commit e9d6b4222b
16 changed files with 197 additions and 99 deletions

View File

@@ -24,6 +24,7 @@ impl heed::BytesEncode<'_> for ObkvCompressedCodec {
pub struct CompressedKvReaderU16<'a>(&'a [u8]);
impl<'a> CompressedKvReaderU16<'a> {
/// Decompresses the KvReader into the buffer using the provided dictionnary.
pub fn decompress_with<'b>(
&self,
buffer: &'b mut Vec<u8>,
@@ -38,6 +39,11 @@ impl<'a> CompressedKvReaderU16<'a> {
)?;
Ok(KvReaderU16::new(&buffer[..size]))
}
/// Returns the KvReader like it is not compressed. Happends when there is no dictionnary yet.
pub fn as_non_compressed(&self) -> KvReaderU16<'a> {
KvReaderU16::new(self.0)
}
}
pub struct CompressedKvWriterU16(Vec<u8>);

View File

@@ -20,6 +20,9 @@ use thiserror::Error;
pub use self::beu16_str_codec::BEU16StrCodec;
pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::compressed_obkv_codec::{
CompressedKvReaderU16, CompressedKvWriterU16, ObkvCompressedCodec,
};
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec;
pub use self::obkv_codec::ObkvCodec;

View File

@@ -20,7 +20,8 @@ use crate::heed_codec::facet::{
FieldIdCodec, OrderedF64Codec,
};
use crate::heed_codec::{
BEU16StrCodec, FstSetCodec, ScriptLanguageCodec, StrBEU16Codec, StrRefCodec,
BEU16StrCodec, CompressedKvReaderU16, FstSetCodec, ObkvCompressedCodec, ScriptLanguageCodec,
StrBEU16Codec, StrRefCodec,
};
use crate::order_by_map::OrderByMap;
use crate::proximity::ProximityPrecision;
@@ -29,8 +30,8 @@ use crate::vector::{Embedding, EmbeddingConfig};
use crate::{
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec,
FieldidsWeightsMap, GeoPoint, ObkvCodec, Result, RoaringBitmapCodec, RoaringBitmapLenCodec,
Search, U8StrStrCodec, Weight, BEU16, BEU32, BEU64,
FieldidsWeightsMap, GeoPoint, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, Search,
U8StrStrCodec, Weight, BEU16, BEU32, BEU64,
};
pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5;
@@ -73,6 +74,7 @@ pub mod main_key {
pub const PROXIMITY_PRECISION: &str = "proximity-precision";
pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
pub const SEARCH_CUTOFF: &str = "search_cutoff";
pub const DOCUMENT_COMPRESSION_DICTIONARY: &str = "document-compression-dictionary";
}
pub mod db_name {
@@ -172,7 +174,7 @@ pub struct Index {
pub vector_arroy: arroy::Database<arroy::distances::Angular>,
/// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>,
pub(crate) documents: Database<BEU32, ObkvCompressedCodec>,
}
impl Index {
@@ -339,6 +341,29 @@ impl Index {
self.env.prepare_for_closing()
}
/* document compression dictionary */
/// Writes the dictionnary that will further be used to compress the documents.
pub fn put_document_compression_dictionary(
&self,
wtxn: &mut RwTxn,
dictionary: &[u8],
) -> heed::Result<()> {
self.main.remap_types::<Str, Bytes>().put(
wtxn,
main_key::DOCUMENT_COMPRESSION_DICTIONARY,
dictionary,
)
}
/// Returns the optional dictionnary to be used when reading the OBKV documents.
pub fn document_compression_dictionary<'t>(
&self,
rtxn: &'t RoTxn,
) -> heed::Result<Option<&'t [u8]>> {
self.main.remap_types::<Str, Bytes>().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
}
/* documents ids */
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST.
@@ -1261,36 +1286,36 @@ impl Index {
/* documents */
/// Returns an iterator over the requested documents. The next item will be an error if a document is missing.
pub fn iter_documents<'a, 't: 'a>(
/// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing.
pub fn iter_compressed_documents<'a, 't: 'a>(
&'a self,
rtxn: &'t RoTxn<'t>,
ids: impl IntoIterator<Item = DocumentId> + 'a,
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> {
) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
Ok(ids.into_iter().map(move |id| {
let kv = self
let compressed = self
.documents
.get(rtxn, &id)?
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })?;
Ok((id, kv))
Ok((id, compressed))
}))
}
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
pub fn documents<'t>(
pub fn compressed_documents<'t>(
&self,
rtxn: &'t RoTxn<'t>,
ids: impl IntoIterator<Item = DocumentId>,
) -> Result<Vec<(DocumentId, obkv::KvReaderU16<'t>)>> {
self.iter_documents(rtxn, ids)?.collect()
) -> Result<Vec<(DocumentId, CompressedKvReaderU16<'t>)>> {
self.iter_compressed_documents(rtxn, ids)?.collect()
}
/// Returns an iterator over all the documents in the index.
pub fn all_documents<'a, 't: 'a>(
pub fn all_compressed_documents<'a, 't: 'a>(
&'a self,
rtxn: &'t RoTxn<'t>,
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> {
self.iter_documents(rtxn, self.documents_ids(rtxn)?)
) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
self.iter_compressed_documents(rtxn, self.documents_ids(rtxn)?)
}
pub fn external_id_of<'a, 't: 'a>(
@@ -1311,8 +1336,15 @@ impl Index {
process: "external_id_of",
})
})?;
Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, obkv) = entry?;
let dictionary = self.document_compression_dictionary(rtxn)?;
let mut buffer = Vec::new();
Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, compressed_obkv) = entry?;
let obkv = match dictionary {
// TODO manage this unwrap correctly
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_obkv.as_non_compressed(),
};
match primary_key.document_id(&obkv, &fields)? {
Ok(document_id) => Ok(document_id),
Err(_) => Err(InternalError::DocumentsError(
@@ -2441,7 +2473,13 @@ pub(crate) mod tests {
"###);
let rtxn = index.read_txn().unwrap();
let (_docid, obkv) = index.documents(&rtxn, [0]).unwrap()[0];
let dictionary = index.document_compression_dictionary(&rtxn).unwrap();
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap()[0];
let mut buffer = Vec::new();
let obkv = match dictionary {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_obkv.as_non_compressed(),
};
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###"
{
@@ -2450,7 +2488,11 @@ pub(crate) mod tests {
"###);
// Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34
let (_docid, obkv) = index.documents(&rtxn, [2]).unwrap()[0];
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap()[0];
let obkv = match dictionary {
Some(dict) => compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_obkv.as_non_compressed(),
};
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###"
{
@@ -2657,7 +2699,7 @@ pub(crate) mod tests {
} = search.execute().unwrap();
let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap();
documents_ids.sort_unstable();
let docs = index.documents(&rtxn, documents_ids).unwrap();
let docs = index.compressed_documents(&rtxn, documents_ids).unwrap();
let mut all_ids = HashSet::new();
for (_docid, obkv) in docs {
let id = obkv.get(primary_key_id).unwrap();

View File

@@ -24,8 +24,14 @@ fn collect_field_values(
) -> Vec<String> {
let mut values = vec![];
let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap();
for doc in index.documents(txn, docids.iter().copied()).unwrap() {
if let Some(v) = doc.1.get(fid) {
let mut buffer = Vec::new();
let dictionary = index.document_compression_dictionary(txn).unwrap();
for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() {
let doc = match dictionary {
Some(dict) => compressed_doc.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_doc.as_non_compressed(),
};
if let Some(v) = doc.get(fid) {
let v: serde_json::Value = serde_json::from_slice(v).unwrap();
let v = v.to_string();
values.push(v);

View File

@@ -407,9 +407,16 @@ pub fn snap_documents(index: &Index) -> String {
let rtxn = index.read_txn().unwrap();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let display = fields_ids_map.ids().collect::<Vec<_>>();
let dictionary = index.document_compression_dictionary(&rtxn).unwrap();
let mut buffer = Vec::new();
for document in index.all_documents(&rtxn).unwrap() {
let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap();
for result in index.all_compressed_documents(&rtxn).unwrap() {
let (_id, compressed_document) = result.unwrap();
let document = match dictionary {
Some(dict) => compressed_document.decompress_with(&mut buffer, dict).unwrap(),
None => compressed_document.as_non_compressed(),
};
let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap();
snap.push_str(&serde_json::to_string(&doc).unwrap());
snap.push('\n');
}

View File

@@ -834,7 +834,7 @@ mod tests {
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
let count = index.all_documents(&rtxn).unwrap().count();
let count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(count, 3);
drop(rtxn);
@@ -861,7 +861,7 @@ mod tests {
assert_eq!(count, 1);
// Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap();
let docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1);
let (id, doc) = docs[0];
assert_eq!(id, 0);
@@ -882,7 +882,7 @@ mod tests {
assert_eq!(count, 1);
// Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap();
let docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1);
let (id, doc) = docs[0];
assert_eq!(id, 0);
@@ -932,7 +932,7 @@ mod tests {
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3);
let docs = index.documents(&rtxn, vec![0, 1, 2]).unwrap();
let docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap();
let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap();
let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap();
drop(rtxn);
@@ -946,7 +946,7 @@ mod tests {
assert_eq!(count, 3);
// the document 0 has been deleted and reinserted with the id 3
let docs = index.documents(&rtxn, vec![1, 2, 0]).unwrap();
let docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap();
let kevin_position =
docs.iter().position(|(_, d)| d.get(0).unwrap() == br#""updated kevin""#).unwrap();
assert_eq!(kevin_position, 2);
@@ -1088,7 +1088,7 @@ mod tests {
let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 6);
let count = index.all_documents(&rtxn).unwrap().count();
let count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(count, 6);
db_snap!(index, word_docids, "updated");
@@ -1506,7 +1506,7 @@ mod tests {
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
let rtxn = index.read_txn().unwrap();
let all_documents_count = index.all_documents(&rtxn).unwrap().count();
let all_documents_count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(all_documents_count, 1);
let external_documents_ids = index.external_documents_ids();
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
@@ -2796,7 +2796,7 @@ mod tests {
// Ensuring all the returned IDs actually exists
let rtxn = index.read_txn().unwrap();
let res = index.search(&rtxn).execute().unwrap();
index.documents(&rtxn, res.documents_ids).unwrap();
index.compressed_documents(&rtxn, res.documents_ids).unwrap();
}
fn delete_documents<'t>(
@@ -3163,7 +3163,7 @@ mod tests {
let deleted_internal_ids = delete_documents(&mut wtxn, &index, &deleted_external_ids);
// list all documents
let results = index.all_documents(&wtxn).unwrap();
let results = index.all_compressed_documents(&wtxn).unwrap();
for result in results {
let (id, _) = result.unwrap();
assert!(

View File

@@ -1035,15 +1035,24 @@ impl<'a, 'i> Transform<'a, 'i> {
if original_sorter.is_some() || flattened_sorter.is_some() {
let modified_faceted_fields = settings_diff.modified_faceted_fields();
let dictionary = self.index.document_compression_dictionary(wtxn)?;
let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new();
let mut buffer = Vec::new();
for result in self.index.external_documents_ids().iter(wtxn)? {
let (external_id, docid) = result?;
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
let old_compressed_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?;
let old_obkv = match dictionary {
// TODO manage this unwrap correctly
Some(dict) => old_compressed_obkv.decompress_with(&mut buffer, dict).unwrap(),
None => old_compressed_obkv.as_non_compressed(),
};
let injected_vectors: std::result::Result<
serde_json::Map<String, serde_json::Value>,
arroy::Error,

View File

@@ -1777,7 +1777,7 @@ mod tests {
// When we search for something that is in the searchable fields
// we must find the appropriate document.
let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap();
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap();
let fid_map = index.fields_ids_map(&rtxn).unwrap();
assert_eq!(documents.len(), 1);
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
@@ -1813,7 +1813,7 @@ mod tests {
snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###);
let result = index.search(&rtxn).query("23").execute().unwrap();
assert_eq!(result.documents_ids.len(), 1);
let documents = index.documents(&rtxn, result.documents_ids).unwrap();
let documents = index.compressed_documents(&rtxn, result.documents_ids).unwrap();
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
}
@@ -1954,7 +1954,7 @@ mod tests {
// Only count the field_id 0 and level 0 facet values.
// TODO we must support typed CSVs for numbers to be understood.
let fidmap = index.fields_ids_map(&rtxn).unwrap();
for document in index.all_documents(&rtxn).unwrap() {
for document in index.all_compressed_documents(&rtxn).unwrap() {
let document = document.unwrap();
let json = crate::obkv_to_json(&fidmap.ids().collect::<Vec<_>>(), &fidmap, document.1)
.unwrap();
@@ -2079,7 +2079,7 @@ mod tests {
// Run an empty query just to ensure that the search results are ordered.
let rtxn = index.read_txn().unwrap();
let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap();
let documents = index.documents(&rtxn, documents_ids).unwrap();
let documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
// Fetch the documents "age" field in the ordre in which the documents appear.
let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap();
@@ -2512,7 +2512,7 @@ mod tests {
let rtxn = index.read_txn().unwrap();
let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap();
let first_id = documents_ids[0];
let documents = index.documents(&rtxn, documents_ids).unwrap();
let documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap();
let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap();
@@ -2681,7 +2681,7 @@ mod tests {
wtxn.commit().unwrap();
let rtxn = index.write_txn().unwrap();
let docs: StdResult<Vec<_>, _> = index.all_documents(&rtxn).unwrap().collect();
let docs: StdResult<Vec<_>, _> = index.all_compressed_documents(&rtxn).unwrap().collect();
let docs = docs.unwrap();
assert_eq!(docs.len(), 5);
}