Compare commits

...

11 Commits

23 changed files with 1188 additions and 677 deletions

936
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -110,7 +110,7 @@ fn main() {
// after executing a batch we check if the database is corrupted // after executing a batch we check if the database is corrupted
let res = index.search(&wtxn).execute().unwrap(); let res = index.search(&wtxn).execute().unwrap();
index.documents(&wtxn, res.documents_ids).unwrap(); index.compressed_documents(&wtxn, res.documents_ids).unwrap();
progression.fetch_add(1, Ordering::Relaxed); progression.fetch_add(1, Ordering::Relaxed);
} }
wtxn.abort(); wtxn.abort();

View File

@ -908,16 +908,22 @@ impl IndexScheduler {
let mut index_dumper = dump.create_index(uid, &metadata)?; let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?; let fields_ids_map = index.fields_ids_map(&rtxn)?;
let dictionary = index.document_decompression_dictionary(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(&rtxn)?; let embedding_configs = index.embedding_configs(&rtxn)?;
let mut buffer = Vec::new();
// 3.1. Dump the documents // 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? { for ret in index.all_compressed_documents(&rtxn)? {
if self.must_stop_processing.get() { if self.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
} }
let (id, doc) = ret?; let (id, compressed) = ret?;
let doc = compressed.decompress_with_optional_dictionary(
&mut buffer,
dictionary.as_ref(),
)?;
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;

View File

@ -2465,12 +2465,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -2525,12 +2533,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -2904,12 +2920,20 @@ mod tests {
// has everything being pushed successfully in milli? // has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -2955,12 +2979,20 @@ mod tests {
// has everything being pushed successfully in milli? // has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -3011,12 +3043,20 @@ mod tests {
// has everything being pushed successfully in milli? // has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -3129,12 +3169,20 @@ mod tests {
// has everything being pushed successfully in milli? // has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -3184,12 +3232,20 @@ mod tests {
// has everything being pushed successfully in milli? // has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -3898,12 +3954,20 @@ mod tests {
// Has everything being pushed successfully in milli? // Has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -3969,12 +4033,20 @@ mod tests {
// Has everything being pushed successfully in milli? // Has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4037,12 +4109,20 @@ mod tests {
// Has everything being pushed successfully in milli? // Has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4098,12 +4178,20 @@ mod tests {
// Has everything being pushed successfully in milli? // Has everything being pushed successfully in milli?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4159,6 +4247,8 @@ mod tests {
// Is the primary key still what we expect? // Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id"); snapshot!(primary_key, @"id");
@ -4166,9 +4256,15 @@ mod tests {
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4220,6 +4316,8 @@ mod tests {
// Is the primary key still what we expect? // Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id"); snapshot!(primary_key, @"id");
@ -4227,9 +4325,15 @@ mod tests {
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4303,6 +4407,8 @@ mod tests {
// Is the primary key still what we expect? // Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"id"); snapshot!(primary_key, @"id");
@ -4310,9 +4416,15 @@ mod tests {
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4389,6 +4501,8 @@ mod tests {
// Is the primary key still what we expect? // Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"paw"); snapshot!(primary_key, @"paw");
@ -4396,9 +4510,15 @@ mod tests {
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -4468,6 +4588,8 @@ mod tests {
// Is the primary key still what we expect? // Is the primary key still what we expect?
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap();
snapshot!(primary_key, @"doggoid"); snapshot!(primary_key, @"doggoid");
@ -4475,9 +4597,15 @@ mod tests {
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
} }
@ -5120,6 +5248,8 @@ mod tests {
{ {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
// Ensure the document have been inserted into the relevant bitamp // Ensure the document have been inserted into the relevant bitamp
let configs = index.embedding_configs(&rtxn).unwrap(); let configs = index.embedding_configs(&rtxn).unwrap();
@ -5139,8 +5269,12 @@ mod tests {
assert_json_snapshot!(embeddings[&simple_hf_name][0] == lab_embed, @"true"); assert_json_snapshot!(embeddings[&simple_hf_name][0] == lab_embed, @"true");
assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true");
let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; let (_id, compressed_doc) =
index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0);
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let doc = obkv_to_json( let doc = obkv_to_json(
&[ &[
fields_ids_map.id("doggo").unwrap(), fields_ids_map.id("doggo").unwrap(),
@ -5194,6 +5328,8 @@ mod tests {
{ {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
// Ensure the document have been inserted into the relevant bitamp // Ensure the document have been inserted into the relevant bitamp
let configs = index.embedding_configs(&rtxn).unwrap(); let configs = index.embedding_configs(&rtxn).unwrap();
@ -5216,8 +5352,12 @@ mod tests {
// remained beagle // remained beagle
assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true");
let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; let (_id, compressed_doc) =
index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0);
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let doc = obkv_to_json( let doc = obkv_to_json(
&[ &[
fields_ids_map.id("doggo").unwrap(), fields_ids_map.id("doggo").unwrap(),
@ -5309,12 +5449,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), name: "documents after initial push"); snapshot!(serde_json::to_string(&documents).unwrap(), name: "documents after initial push");
@ -5348,12 +5496,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// the all the vectors linked to the new specified embedder have been removed // the all the vectors linked to the new specified embedder have been removed
// Only the unknown embedders stays in the document DB // Only the unknown embedders stays in the document DB
@ -5456,9 +5612,15 @@ mod tests {
// the document with the id 3 should have its original embedding updated // the document with the id 3 should have its original embedding updated
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let docid = index.external_documents_ids.get(&rtxn, "3").unwrap().unwrap(); let docid = index.external_documents_ids.get(&rtxn, "3").unwrap().unwrap();
let doc = index.documents(&rtxn, Some(docid)).unwrap()[0]; let (_id, compressed_doc) =
let doc = obkv_to_json(&field_ids, &field_ids_map, doc.1).unwrap(); index.compressed_documents(&rtxn, Some(docid)).unwrap().remove(0);
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let doc = obkv_to_json(&field_ids, &field_ids_map, doc).unwrap();
snapshot!(json_string!(doc), @r###" snapshot!(json_string!(doc), @r###"
{ {
"id": 3, "id": 3,
@ -5570,12 +5732,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"}]"###); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"}]"###);
let conf = index.embedding_configs(&rtxn).unwrap(); let conf = index.embedding_configs(&rtxn).unwrap();
@ -5610,12 +5780,20 @@ mod tests {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), @"[]"); snapshot!(serde_json::to_string(&documents).unwrap(), @"[]");
let conf = index.embedding_configs(&rtxn).unwrap(); let conf = index.embedding_configs(&rtxn).unwrap();
@ -5726,12 +5904,20 @@ mod tests {
{ {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel"}]"###); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel"}]"###);
} }
@ -5761,12 +5947,20 @@ mod tests {
{ {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###);
} }
@ -5794,12 +5988,20 @@ mod tests {
{ {
let index = index_scheduler.index("doggos").unwrap(); let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>(); let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index let documents = index
.all_documents(&rtxn) .all_compressed_documents(&rtxn)
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| {
let (_id, compressed_doc) = ret.unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
obkv_to_json(&field_ids, &field_ids_map, doc).unwrap()
})
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// FIXME: redaction // FIXME: redaction

View File

@ -12,7 +12,7 @@ pub mod star_or;
pub mod task_view; pub mod task_view;
pub mod tasks; pub mod tasks;
pub mod versioning; pub mod versioning;
pub use milli::{heed, Index}; pub use milli::{heed, zstd, Index};
use uuid::Uuid; use uuid::Uuid;
pub use versioning::VERSION_FILE_NAME; pub use versioning::VERSION_FILE_NAME;
pub use {milli, serde_cs}; pub use {milli, serde_cs};

View File

@ -125,7 +125,7 @@ reqwest = { version = "0.12.5", features = [
sha-1 = { version = "0.10.1", optional = true } sha-1 = { version = "0.10.1", optional = true }
static-files = { version = "0.2.4", optional = true } static-files = { version = "0.2.4", optional = true }
tempfile = { version = "3.10.1", optional = true } tempfile = { version = "3.10.1", optional = true }
zip = { version = "2.1.3", optional = true } zip = { version = "2.1.3", default-features = false, features = ["deflate"], optional = true }
[features] [features]
default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard"] default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard"]

View File

@ -603,44 +603,51 @@ fn some_documents<'a, 't: 'a>(
retrieve_vectors: RetrieveVectors, retrieve_vectors: RetrieveVectors,
) -> Result<impl Iterator<Item = Result<Document, ResponseError>> + 'a, ResponseError> { ) -> Result<impl Iterator<Item = Result<Document, ResponseError>> + 'a, ResponseError> {
let fields_ids_map = index.fields_ids_map(rtxn)?; let fields_ids_map = index.fields_ids_map(rtxn)?;
let dictionary = index.document_decompression_dictionary(rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
let embedding_configs = index.embedding_configs(rtxn)?; let embedding_configs = index.embedding_configs(rtxn)?;
let mut buffer = Vec::new();
Ok(index.iter_documents(rtxn, doc_ids)?.map(move |ret| { Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| {
ret.map_err(ResponseError::from).and_then(|(key, document)| -> Result<_, ResponseError> { ret.map_err(ResponseError::from).and_then(
let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; |(key, compressed_document)| -> Result<_, ResponseError> {
match retrieve_vectors { let document = compressed_document
RetrieveVectors::Ignore => {} .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
RetrieveVectors::Hide => { let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?;
document.remove("_vectors"); match retrieve_vectors {
} RetrieveVectors::Ignore => {}
RetrieveVectors::Retrieve => { RetrieveVectors::Hide => {
// Clippy is simply wrong document.remove("_vectors");
#[allow(clippy::manual_unwrap_or_default)] }
let mut vectors = match document.remove("_vectors") { RetrieveVectors::Retrieve => {
Some(Value::Object(map)) => map, // Clippy is simply wrong
_ => Default::default(), #[allow(clippy::manual_unwrap_or_default)]
}; let mut vectors = match document.remove("_vectors") {
for (name, vector) in index.embeddings(rtxn, key)? { Some(Value::Object(map)) => map,
let user_provided = embedding_configs _ => Default::default(),
.iter() };
.find(|conf| conf.name == name) for (name, vector) in index.embeddings(rtxn, key)? {
.is_some_and(|conf| conf.user_provided.contains(key)); let user_provided = embedding_configs
let embeddings = ExplicitVectors { .iter()
embeddings: Some(vector.into()), .find(|conf| conf.name == name)
regenerate: !user_provided, .is_some_and(|conf| conf.user_provided.contains(key));
}; let embeddings = ExplicitVectors {
vectors.insert( embeddings: Some(vector.into()),
name, regenerate: !user_provided,
serde_json::to_value(embeddings).map_err(MeilisearchHttpError::from)?, };
); vectors.insert(
name,
serde_json::to_value(embeddings)
.map_err(MeilisearchHttpError::from)?,
);
}
document.insert("_vectors".into(), vectors.into());
} }
document.insert("_vectors".into(), vectors.into());
} }
}
Ok(document) Ok(document)
}) },
)
})) }))
} }

View File

@ -1123,10 +1123,16 @@ fn make_hits(
formatter_builder.crop_marker(format.crop_marker); formatter_builder.crop_marker(format.crop_marker);
formatter_builder.highlight_prefix(format.highlight_pre_tag); formatter_builder.highlight_prefix(format.highlight_pre_tag);
formatter_builder.highlight_suffix(format.highlight_post_tag); formatter_builder.highlight_suffix(format.highlight_post_tag);
let decompression_dictionary = index.document_decompression_dictionary(rtxn)?;
let mut buffer = Vec::new();
let mut documents = Vec::new(); let mut documents = Vec::new();
let embedding_configs = index.embedding_configs(rtxn)?; let embedding_configs = index.embedding_configs(rtxn)?;
let documents_iter = index.documents(rtxn, documents_ids)?; let documents_iter = index.compressed_documents(rtxn, documents_ids)?;
for ((id, obkv), score) in documents_iter.into_iter().zip(document_scores.into_iter()) { for ((id, compressed), score) in documents_iter.into_iter().zip(document_scores.into_iter()) {
let obkv = compressed
.decompress_with_optional_dictionary(&mut buffer, decompression_dictionary.as_ref())
// TODO use a better error?
.map_err(|e| MeilisearchHttpError::HeedError(e.into()))?;
// First generate a document with all the displayed fields // First generate a document with all the displayed fields
let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?; let displayed_document = make_document(&displayed_ids, &fields_ids_map, obkv)?;

View File

@ -260,6 +260,7 @@ fn export_a_dump(
// 4. Dump the indexes // 4. Dump the indexes
let mut count = 0; let mut count = 0;
let mut buffer = Vec::new();
for result in index_mapping.iter(&rtxn)? { for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?; let (uid, uuid) = result?;
let index_path = db_path.join("indexes").join(uuid.to_string()); let index_path = db_path.join("indexes").join(uuid.to_string());
@ -268,6 +269,7 @@ fn export_a_dump(
})?; })?;
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let metadata = IndexMetadata { let metadata = IndexMetadata {
uid: uid.to_owned(), uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from), primary_key: index.primary_key(&rtxn)?.map(String::from),
@ -280,8 +282,11 @@ fn export_a_dump(
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 4.1. Dump the documents // 4.1. Dump the documents
for ret in index.all_documents(&rtxn)? { for ret in index.all_compressed_documents(&rtxn)? {
let (_id, doc) = ret?; let (_id, compressed_doc) = ret?;
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?; index_dumper.push_document(&document)?;
} }

View File

@ -38,6 +38,7 @@ heed = { version = "0.20.3", default-features = false, features = [
indexmap = { version = "2.2.6", features = ["serde"] } indexmap = { version = "2.2.6", features = ["serde"] }
json-depth-checker = { path = "../json-depth-checker" } json-depth-checker = { path = "../json-depth-checker" }
levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] } levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
zstd = { version = "0.13.1", features = ["zdict_builder", "experimental"] }
memmap2 = "0.9.4" memmap2 = "0.9.4"
obkv = "0.2.2" obkv = "0.2.2"
once_cell = "1.19.0" once_cell = "1.19.0"

View File

@ -30,6 +30,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let index = Index::new(options, dataset)?; let index = Index::new(options, dataset)?;
let txn = index.read_txn()?; let txn = index.read_txn()?;
let dictionary = index.document_decompression_dictionary(&txn).unwrap();
let mut query = String::new(); let mut query = String::new();
while stdin().read_line(&mut query)? > 0 { while stdin().read_line(&mut query)? > 0 {
for _ in 0..2 { for _ in 0..2 {
@ -49,6 +50,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let start = Instant::now(); let start = Instant::now();
let mut ctx = SearchContext::new(&index, &txn)?; let mut ctx = SearchContext::new(&index, &txn)?;
let mut buffer = Vec::new();
let universe = filtered_universe(ctx.index, ctx.txn, &None)?; let universe = filtered_universe(ctx.index, ctx.txn, &None)?;
let docs = execute_search( let docs = execute_search(
@ -75,11 +77,14 @@ fn main() -> Result<(), Box<dyn Error>> {
let elapsed = start.elapsed(); let elapsed = start.elapsed();
println!("new: {}us, docids: {:?}", elapsed.as_micros(), docs.documents_ids); println!("new: {}us, docids: {:?}", elapsed.as_micros(), docs.documents_ids);
if print_documents { if print_documents {
let documents = index let compressed_documents = index
.documents(&txn, docs.documents_ids.iter().copied()) .compressed_documents(&txn, docs.documents_ids.iter().copied())
.unwrap() .unwrap()
.into_iter() .into_iter()
.map(|(id, obkv)| { .map(|(id, compressed_obkv)| {
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let mut object = serde_json::Map::default(); let mut object = serde_json::Map::default();
for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() {
let value = obkv.get(fid).unwrap(); let value = obkv.get(fid).unwrap();
@ -90,17 +95,20 @@ fn main() -> Result<(), Box<dyn Error>> {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for (id, document) in documents { for (id, document) in compressed_documents {
println!("{id}:"); println!("{id}:");
println!("{document}"); println!("{document}");
} }
let documents = index let compressed_documents = index
.documents(&txn, docs.documents_ids.iter().copied()) .compressed_documents(&txn, docs.documents_ids.iter().copied())
.unwrap() .unwrap()
.into_iter() .into_iter()
.map(|(id, obkv)| { .map(|(id, compressed_obkv)| {
let mut object = serde_json::Map::default(); let mut object = serde_json::Map::default();
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() { for (fid, fid_name) in index.fields_ids_map(&txn).unwrap().iter() {
let value = obkv.get(fid).unwrap(); let value = obkv.get(fid).unwrap();
let value: serde_json::Value = serde_json::from_slice(value).unwrap(); let value: serde_json::Value = serde_json::from_slice(value).unwrap();
@ -110,7 +118,7 @@ fn main() -> Result<(), Box<dyn Error>> {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
println!("{}us: {:?}", elapsed.as_micros(), docs.documents_ids); println!("{}us: {:?}", elapsed.as_micros(), docs.documents_ids);
for (id, document) in documents { for (id, document) in compressed_documents {
println!("{id}:"); println!("{id}:");
println!("{document}"); println!("{document}");
} }

View File

@ -0,0 +1,89 @@
use std::borrow::Cow;
use std::io;
use std::io::ErrorKind;
use heed::BoxedError;
use obkv::KvReaderU16;
use zstd::bulk::{Compressor, Decompressor};
use zstd::dict::{DecoderDictionary, EncoderDictionary};
pub struct CompressedObkvCodec;
impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec {
type DItem = CompressedKvReaderU16<'a>;
fn bytes_decode(bytes: &'a [u8]) -> Result<Self::DItem, BoxedError> {
Ok(CompressedKvReaderU16(bytes))
}
}
impl heed::BytesEncode<'_> for CompressedObkvCodec {
type EItem = CompressedKvWriterU16;
fn bytes_encode(item: &Self::EItem) -> Result<Cow<[u8]>, BoxedError> {
Ok(Cow::Borrowed(&item.0))
}
}
pub struct CompressedKvReaderU16<'a>(&'a [u8]);
impl<'a> CompressedKvReaderU16<'a> {
/// Decompresses the KvReader into the buffer using the provided dictionnary.
pub fn decompress_with<'b>(
&self,
buffer: &'b mut Vec<u8>,
dictionary: &DecoderDictionary,
) -> io::Result<KvReaderU16<'b>> {
const TWO_GIGABYTES: usize = 2 * 1024 * 1024 * 1024;
let mut decompressor = Decompressor::with_prepared_dictionary(dictionary)?;
let mut max_size = self.0.len() * 4;
let size = loop {
buffer.resize(max_size, 0);
match decompressor.decompress_to_buffer(self.0, &mut buffer[..max_size]) {
Ok(size) => break size,
// TODO don't do that !!! But what should I do?
Err(e) if e.kind() == ErrorKind::Other && max_size <= TWO_GIGABYTES => {
max_size *= 2
}
Err(e) => return Err(e),
}
};
Ok(KvReaderU16::new(&buffer[..size]))
}
/// Returns the KvReader like it is not compressed.
/// Happends when there is no dictionary yet.
pub fn as_non_compressed(&self) -> KvReaderU16<'a> {
KvReaderU16::new(self.0)
}
/// Decompresses this KvReader if necessary.
pub fn decompress_with_optional_dictionary<'b>(
&self,
buffer: &'b mut Vec<u8>,
dictionary: Option<&DecoderDictionary>,
) -> io::Result<KvReaderU16<'b>>
where
'a: 'b,
{
match dictionary {
Some(dict) => self.decompress_with(buffer, dict),
None => Ok(self.as_non_compressed()),
}
}
}
pub struct CompressedKvWriterU16(Vec<u8>);
impl CompressedKvWriterU16 {
// TODO ask for a KvReaderU16 here
pub fn new_with_dictionary(input: &[u8], dictionary: &EncoderDictionary) -> io::Result<Self> {
let mut compressor = Compressor::with_prepared_dictionary(dictionary)?;
compressor.compress(input).map(CompressedKvWriterU16)
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}

View File

@ -1,6 +1,7 @@
mod beu16_str_codec; mod beu16_str_codec;
mod beu32_str_codec; mod beu32_str_codec;
mod byte_slice_ref; mod byte_slice_ref;
mod compressed_obkv_codec;
pub mod facet; pub mod facet;
mod field_id_word_count_codec; mod field_id_word_count_codec;
mod fst_set_codec; mod fst_set_codec;
@ -19,6 +20,9 @@ use thiserror::Error;
pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu16_str_codec::BEU16StrCodec;
pub use self::beu32_str_codec::BEU32StrCodec; pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::compressed_obkv_codec::{
CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec,
};
pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec; pub use self::fst_set_codec::FstSetCodec;
pub use self::obkv_codec::ObkvCodec; pub use self::obkv_codec::ObkvCodec;

View File

@ -11,6 +11,7 @@ use roaring::RoaringBitmap;
use rstar::RTree; use rstar::RTree;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use zstd::dict::{DecoderDictionary, EncoderDictionary};
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::error::{InternalError, UserError}; use crate::error::{InternalError, UserError};
@ -20,7 +21,8 @@ use crate::heed_codec::facet::{
FieldIdCodec, OrderedF64Codec, FieldIdCodec, OrderedF64Codec,
}; };
use crate::heed_codec::{ use crate::heed_codec::{
BEU16StrCodec, FstSetCodec, ScriptLanguageCodec, StrBEU16Codec, StrRefCodec, BEU16StrCodec, CompressedKvReaderU16, CompressedObkvCodec, FstSetCodec, ScriptLanguageCodec,
StrBEU16Codec, StrRefCodec,
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
@ -29,8 +31,8 @@ use crate::vector::{Embedding, EmbeddingConfig};
use crate::{ use crate::{
default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds, default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds,
FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec, FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec,
FieldidsWeightsMap, GeoPoint, ObkvCodec, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, FieldidsWeightsMap, GeoPoint, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, Search,
Search, U8StrStrCodec, Weight, BEU16, BEU32, BEU64, U8StrStrCodec, Weight, BEU16, BEU32, BEU64,
}; };
pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5; pub const DEFAULT_MIN_WORD_LEN_ONE_TYPO: u8 = 5;
@ -73,6 +75,7 @@ pub mod main_key {
pub const PROXIMITY_PRECISION: &str = "proximity-precision"; pub const PROXIMITY_PRECISION: &str = "proximity-precision";
pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs";
pub const SEARCH_CUTOFF: &str = "search_cutoff"; pub const SEARCH_CUTOFF: &str = "search_cutoff";
pub const DOCUMENT_COMPRESSION_DICTIONARY: &str = "document-compression-dictionary";
} }
pub mod db_name { pub mod db_name {
@ -172,7 +175,7 @@ pub struct Index {
pub vector_arroy: arroy::Database<arroy::distances::Angular>, pub vector_arroy: arroy::Database<arroy::distances::Angular>,
/// Maps the document id to the document as an obkv store. /// Maps the document id to the document as an obkv store.
pub(crate) documents: Database<BEU32, ObkvCodec>, pub(crate) documents: Database<BEU32, CompressedObkvCodec>,
} }
impl Index { impl Index {
@ -339,6 +342,50 @@ impl Index {
self.env.prepare_for_closing() self.env.prepare_for_closing()
} }
/* document compression dictionary */
/// Writes the dictionnary that will further be used to compress the documents.
pub fn put_document_compression_dictionary(
&self,
wtxn: &mut RwTxn,
dictionary: &[u8],
) -> heed::Result<()> {
self.main.remap_types::<Str, Bytes>().put(
wtxn,
main_key::DOCUMENT_COMPRESSION_DICTIONARY,
dictionary,
)
}
/// Deletes the document compression dictionary.
pub fn delete_document_compression_dictionary(&self, wtxn: &mut RwTxn) -> heed::Result<bool> {
self.main.remap_key_type::<Str>().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
}
/// Returns the optional raw bytes dictionary to be used when reading or writing the OBKV documents.
pub fn document_compression_raw_dictionary<'t>(
&self,
rtxn: &'t RoTxn,
) -> heed::Result<Option<&'t [u8]>> {
self.main.remap_types::<Str, Bytes>().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY)
}
pub fn document_decompression_dictionary<'t>(
&self,
rtxn: &'t RoTxn,
) -> heed::Result<Option<DecoderDictionary<'t>>> {
self.document_compression_raw_dictionary(rtxn).map(|opt| opt.map(DecoderDictionary::new))
}
pub fn document_compression_dictionary(
&self,
rtxn: &RoTxn,
) -> heed::Result<Option<EncoderDictionary<'static>>> {
const COMPRESSION_LEVEL: i32 = 19;
self.document_compression_raw_dictionary(rtxn)
.map(|opt| opt.map(|bytes| EncoderDictionary::copy(bytes, COMPRESSION_LEVEL)))
}
/* documents ids */ /* documents ids */
/// Writes the documents ids that corresponds to the user-ids-documents-ids FST. /// Writes the documents ids that corresponds to the user-ids-documents-ids FST.
@ -1261,36 +1308,36 @@ impl Index {
/* documents */ /* documents */
/// Returns an iterator over the requested documents. The next item will be an error if a document is missing. /// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing.
pub fn iter_documents<'a, 't: 'a>( pub fn iter_compressed_documents<'a, 't: 'a>(
&'a self, &'a self,
rtxn: &'t RoTxn<'t>, rtxn: &'t RoTxn<'t>,
ids: impl IntoIterator<Item = DocumentId> + 'a, ids: impl IntoIterator<Item = DocumentId> + 'a,
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> { ) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
Ok(ids.into_iter().map(move |id| { Ok(ids.into_iter().map(move |id| {
let kv = self let compressed = self
.documents .documents
.get(rtxn, &id)? .get(rtxn, &id)?
.ok_or(UserError::UnknownInternalDocumentId { document_id: id })?; .ok_or(UserError::UnknownInternalDocumentId { document_id: id })?;
Ok((id, kv)) Ok((id, compressed))
})) }))
} }
/// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing.
pub fn documents<'t>( pub fn compressed_documents<'t>(
&self, &self,
rtxn: &'t RoTxn<'t>, rtxn: &'t RoTxn<'t>,
ids: impl IntoIterator<Item = DocumentId>, ids: impl IntoIterator<Item = DocumentId>,
) -> Result<Vec<(DocumentId, obkv::KvReaderU16<'t>)>> { ) -> Result<Vec<(DocumentId, CompressedKvReaderU16<'t>)>> {
self.iter_documents(rtxn, ids)?.collect() self.iter_compressed_documents(rtxn, ids)?.collect()
} }
/// Returns an iterator over all the documents in the index. /// Returns an iterator over all the documents in the index.
pub fn all_documents<'a, 't: 'a>( pub fn all_compressed_documents<'a, 't: 'a>(
&'a self, &'a self,
rtxn: &'t RoTxn<'t>, rtxn: &'t RoTxn<'t>,
) -> Result<impl Iterator<Item = Result<(DocumentId, obkv::KvReaderU16<'t>)>> + 'a> { ) -> Result<impl Iterator<Item = Result<(DocumentId, CompressedKvReaderU16<'t>)>> + 'a> {
self.iter_documents(rtxn, self.documents_ids(rtxn)?) self.iter_compressed_documents(rtxn, self.documents_ids(rtxn)?)
} }
pub fn external_id_of<'a, 't: 'a>( pub fn external_id_of<'a, 't: 'a>(
@ -1311,8 +1358,13 @@ impl Index {
process: "external_id_of", process: "external_id_of",
}) })
})?; })?;
Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { let dictionary =
let (_docid, obkv) = entry?; self.document_compression_raw_dictionary(rtxn)?.map(DecoderDictionary::copy);
let mut buffer = Vec::new();
Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> {
let (_docid, compressed_obkv) = entry?;
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
match primary_key.document_id(&obkv, &fields)? { match primary_key.document_id(&obkv, &fields)? {
Ok(document_id) => Ok(document_id), Ok(document_id) => Ok(document_id),
Err(_) => Err(InternalError::DocumentsError( Err(_) => Err(InternalError::DocumentsError(
@ -2441,7 +2493,12 @@ pub(crate) mod tests {
"###); "###);
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let (_docid, obkv) = index.documents(&rtxn, [0]).unwrap()[0]; let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap().remove(0);
let mut buffer = Vec::new();
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###" insta::assert_debug_snapshot!(json, @r###"
{ {
@ -2450,7 +2507,10 @@ pub(crate) mod tests {
"###); "###);
// Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34 // Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34
let (_docid, obkv) = index.documents(&rtxn, [2]).unwrap()[0]; let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap().remove(0);
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap();
insta::assert_debug_snapshot!(json, @r###" insta::assert_debug_snapshot!(json, @r###"
{ {
@ -2459,6 +2519,7 @@ pub(crate) mod tests {
} }
"###); "###);
drop(dictionary);
drop(rtxn); drop(rtxn);
// Add new documents again // Add new documents again
@ -2657,11 +2718,16 @@ pub(crate) mod tests {
} = search.execute().unwrap(); } = search.execute().unwrap();
let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap(); let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap();
documents_ids.sort_unstable(); documents_ids.sort_unstable();
let docs = index.documents(&rtxn, documents_ids).unwrap(); let compressed_docs = index.compressed_documents(&rtxn, documents_ids).unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let mut buffer = Vec::new();
let mut all_ids = HashSet::new(); let mut all_ids = HashSet::new();
for (_docid, obkv) in docs { for (_docid, compressed) in compressed_docs {
let id = obkv.get(primary_key_id).unwrap(); let doc = compressed
assert!(all_ids.insert(id)); .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let id = doc.get(primary_key_id).unwrap();
assert!(all_ids.insert(id.to_vec()));
} }
} }

View File

@ -45,7 +45,7 @@ pub use search::new::{
}; };
use serde_json::Value; use serde_json::Value;
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
pub use {charabia as tokenizer, heed}; pub use {charabia as tokenizer, heed, zstd};
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError}; pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
pub use self::criterion::{default_criteria, Criterion, CriterionError}; pub use self::criterion::{default_criteria, Criterion, CriterionError};

View File

@ -24,8 +24,13 @@ fn collect_field_values(
) -> Vec<String> { ) -> Vec<String> {
let mut values = vec![]; let mut values = vec![];
let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap(); let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap();
for doc in index.documents(txn, docids.iter().copied()).unwrap() { let mut buffer = Vec::new();
if let Some(v) = doc.1.get(fid) { let dictionary = index.document_decompression_dictionary(txn).unwrap();
for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() {
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
if let Some(v) = doc.get(fid) {
let v: serde_json::Value = serde_json::from_slice(v).unwrap(); let v: serde_json::Value = serde_json::from_slice(v).unwrap();
let v = v.to_string(); let v = v.to_string();
values.push(v); values.push(v);

View File

@ -407,9 +407,15 @@ pub fn snap_documents(index: &Index) -> String {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
let display = fields_ids_map.ids().collect::<Vec<_>>(); let display = fields_ids_map.ids().collect::<Vec<_>>();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let mut buffer = Vec::new();
for document in index.all_documents(&rtxn).unwrap() { for result in index.all_compressed_documents(&rtxn).unwrap() {
let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap(); let (_id, compressed_document) = result.unwrap();
let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap();
snap.push_str(&serde_json::to_string(&doc).unwrap()); snap.push_str(&serde_json::to_string(&doc).unwrap());
snap.push('\n'); snap.push('\n');
} }

View File

@ -63,6 +63,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> {
self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?; self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?;
self.index.delete_geo_rtree(self.wtxn)?; self.index.delete_geo_rtree(self.wtxn)?;
self.index.delete_geo_faceted_documents_ids(self.wtxn)?; self.index.delete_geo_faceted_documents_ids(self.wtxn)?;
self.index.delete_document_compression_dictionary(self.wtxn)?;
// Remove all user-provided bits from the configs // Remove all user-provided bits from the configs
let mut configs = self.index.embedding_configs(self.wtxn)?; let mut configs = self.index.embedding_configs(self.wtxn)?;

View File

@ -5,7 +5,7 @@ mod transform;
mod typed_chunk; mod typed_chunk;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::{Read, Seek}; use std::io::{BufWriter, Read, Seek, Write};
use std::iter; use std::iter;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::result::Result as StdResult; use std::result::Result as StdResult;
@ -13,8 +13,8 @@ use std::sync::Arc;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use grenad::{Merger, MergerBuilder}; use grenad::{Merger, MergerBuilder};
use heed::types::Str; use heed::types::{Bytes, Str};
use heed::Database; use heed::{Database, PutFlags};
use rand::SeedableRng; use rand::SeedableRng;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -34,13 +34,14 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput}; pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError, UserError}; use crate::error::{Error, InternalError, UserError};
use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec};
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap; pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
}; };
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{CboRoaringBitmapCodec, Index, Result}; use crate::{CboRoaringBitmapCodec, Index, Result, BEU32};
static MERGED_DATABASE_COUNT: usize = 7; static MERGED_DATABASE_COUNT: usize = 7;
static PREFIX_DATABASE_COUNT: usize = 4; static PREFIX_DATABASE_COUNT: usize = 4;
@ -266,7 +267,7 @@ where
target = "indexing::details", target = "indexing::details",
name = "index_documents_raw" name = "index_documents_raw"
)] )]
pub fn execute_raw(self, output: TransformOutput) -> Result<u64> pub fn execute_raw(mut self, output: TransformOutput) -> Result<u64>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
@ -565,6 +566,10 @@ where
word_fid_docids.map(MergerBuilder::build), word_fid_docids.map(MergerBuilder::build),
)?; )?;
// This call contains an internal condition to ensure we do not always
// generate compression dictionaries and always compress documents.
self.manage_compression_dictionary()?;
Ok(number_of_documents) Ok(number_of_documents)
} }
@ -575,7 +580,7 @@ where
name = "index_documents_prefix_databases" name = "index_documents_prefix_databases"
)] )]
pub fn execute_prefix_databases( pub fn execute_prefix_databases(
self, &mut self,
word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>, exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>, word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
@ -747,6 +752,64 @@ where
Ok(()) Ok(())
} }
/// Computes a new dictionay and compress the documents with it in the database.
///
/// Documents still need to be directly compressed when being written in the database and a dictionary exists.
#[tracing::instrument(
level = "trace",
skip_all,
target = "indexing::compression",
name = "compress_documents_database"
)]
pub fn manage_compression_dictionary(&mut self) -> Result<()> {
/// The size of the dictionary generated from a sample of the documents already
/// in the database. It will be used when compressing and decompressing documents.
const COMPRESSION_DICTIONARY_SIZE: usize = 64_000;
/// The minimum number of documents to trigger the generation of the compression dictionary.
const COMPRESSION_ON_NUMBER_OF_DOCUMENTS: usize = 10_000;
if self.index.number_of_documents(self.wtxn)? < COMPRESSION_ON_NUMBER_OF_DOCUMENTS as u64
|| self.index.document_compression_dictionary(self.wtxn)?.is_some()
{
return Ok(());
}
let mut sample_file = tempfile::tempfile().map(BufWriter::new)?;
let mut sample_sizes = Vec::new();
// TODO make this 1_000 be 10k and const
let documents = self.index.documents.remap_types::<BEU32, Bytes>();
for result in documents.iter(self.wtxn)?.take(COMPRESSION_ON_NUMBER_OF_DOCUMENTS) {
let (_id, bytes) = result?;
sample_file.write_all(bytes)?;
sample_sizes.push(bytes.len());
}
let sample_file = sample_file.into_inner().map_err(|ie| ie.into_error())?;
let sample_data = unsafe { memmap2::Mmap::map(&sample_file)? };
let dictionary =
zstd::dict::from_continuous(&sample_data, &sample_sizes, COMPRESSION_DICTIONARY_SIZE)?;
self.index.put_document_compression_dictionary(self.wtxn, &dictionary)?;
// safety: We just set the dictionary above. It must be there when we get it back.
let dictionary = self.index.document_compression_dictionary(self.wtxn)?.unwrap();
let mut iter = self.index.documents.iter_mut(self.wtxn)?;
while let Some(result) = iter.next() {
let (docid, document) = result?;
let document = document.as_non_compressed().as_bytes();
let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?;
// safety: the compressed document is entirely owned
unsafe {
iter.put_current_with_options::<CompressedObkvCodec>(
PutFlags::empty(),
&docid,
&compressed,
)?;
}
}
Ok(())
}
} }
/// Run the word prefix docids update operation. /// Run the word prefix docids update operation.
@ -834,7 +897,7 @@ mod tests {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3); assert_eq!(count, 3);
let count = index.all_documents(&rtxn).unwrap().count(); let count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(count, 3); assert_eq!(count, 3);
drop(rtxn); drop(rtxn);
@ -843,6 +906,7 @@ mod tests {
#[test] #[test]
fn simple_document_merge() { fn simple_document_merge() {
let mut index = TempIndex::new(); let mut index = TempIndex::new();
let mut buffer = Vec::new();
index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments;
// First we send 3 documents with duplicate ids and // First we send 3 documents with duplicate ids and
@ -861,16 +925,21 @@ mod tests {
assert_eq!(count, 1); assert_eq!(count, 1);
// Check that we get only one document from the database. // Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap(); let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1); assert_eq!(compressed_docs.len(), 1);
let (id, doc) = docs[0]; let (id, compressed_doc) = compressed_docs.remove(0);
assert_eq!(id, 0); assert_eq!(id, 0);
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
// Check that this document is equal to the last one sent. // Check that this document is equal to the last one sent.
let mut doc_iter = doc.iter(); let mut doc_iter = doc.iter();
assert_eq!(doc_iter.next(), Some((0, &b"1"[..]))); assert_eq!(doc_iter.next(), Some((0, &b"1"[..])));
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
assert_eq!(doc_iter.next(), None); assert_eq!(doc_iter.next(), None);
drop(dictionary);
drop(rtxn); drop(rtxn);
// Second we send 1 document with id 1, to force it to be merged with the previous one. // Second we send 1 document with id 1, to force it to be merged with the previous one.
@ -882,10 +951,14 @@ mod tests {
assert_eq!(count, 1); assert_eq!(count, 1);
// Check that we get only one document from the database. // Check that we get only one document from the database.
let docs = index.documents(&rtxn, Some(0)).unwrap(); let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap();
assert_eq!(docs.len(), 1); assert_eq!(compressed_docs.len(), 1);
let (id, doc) = docs[0]; let (id, compressed_doc) = compressed_docs.remove(0);
assert_eq!(id, 0); assert_eq!(id, 0);
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
// Check that this document is equal to the last one sent. // Check that this document is equal to the last one sent.
let mut doc_iter = doc.iter(); let mut doc_iter = doc.iter();
@ -893,6 +966,7 @@ mod tests {
assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..])));
assert_eq!(doc_iter.next(), Some((2, &b"25"[..]))); assert_eq!(doc_iter.next(), Some((2, &b"25"[..])));
assert_eq!(doc_iter.next(), None); assert_eq!(doc_iter.next(), None);
drop(dictionary);
drop(rtxn); drop(rtxn);
} }
@ -917,6 +991,7 @@ mod tests {
#[test] #[test]
fn simple_auto_generated_documents_ids() { fn simple_auto_generated_documents_ids() {
let mut index = TempIndex::new(); let mut index = TempIndex::new();
let mut buffer = Vec::new();
index.index_documents_config.autogenerate_docids = true; index.index_documents_config.autogenerate_docids = true;
// First we send 3 documents with ids from 1 to 3. // First we send 3 documents with ids from 1 to 3.
index index
@ -929,12 +1004,26 @@ mod tests {
// Check that there is 3 documents now. // Check that there is 3 documents now.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3); assert_eq!(count, 3);
let docs = index.documents(&rtxn, vec![0, 1, 2]).unwrap(); let compressed_docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap();
let (_id, obkv) = docs.iter().find(|(_id, kv)| kv.get(0) == Some(br#""kevin""#)).unwrap(); let (_id, compressed_obkv) = compressed_docs
.iter()
.find(|(_id, compressed_doc)| {
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
doc.get(0) == Some(br#""kevin""#)
})
.unwrap();
let obkv = compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap(); let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap();
drop(dictionary);
drop(rtxn); drop(rtxn);
// Second we send 1 document with the generated uuid, to erase the previous ones. // Second we send 1 document with the generated uuid, to erase the previous ones.
@ -942,21 +1031,34 @@ mod tests {
// Check that there is **always** 3 documents. // Check that there is **always** 3 documents.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 3); assert_eq!(count, 3);
// the document 0 has been deleted and reinserted with the id 3 // the document 0 has been deleted and reinserted with the id 3
let docs = index.documents(&rtxn, vec![1, 2, 0]).unwrap(); let mut compressed_docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap();
let kevin_position = let kevin_position = compressed_docs
docs.iter().position(|(_, d)| d.get(0).unwrap() == br#""updated kevin""#).unwrap(); .iter()
.position(|(_, compressed_doc)| {
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
doc.get(0).unwrap() == br#""updated kevin""#
})
.unwrap();
assert_eq!(kevin_position, 2); assert_eq!(kevin_position, 2);
let (_, doc) = docs[kevin_position]; let (_, compressed_doc) = compressed_docs.remove(kevin_position);
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
// Check that this document is equal to the last // Check that this document is equal to the last
// one sent and that an UUID has been generated. // one sent and that an UUID has been generated.
assert_eq!(doc.get(0), Some(&br#""updated kevin""#[..])); assert_eq!(doc.get(0), Some(&br#""updated kevin""#[..]));
// This is an UUID, it must be 36 bytes long plus the 2 surrounding string quotes ("). // This is an UUID, it must be 36 bytes long plus the 2 surrounding string quotes (").
assert_eq!(doc.get(1).unwrap().len(), 36 + 2); assert_eq!(doc.get(1).unwrap().len(), 36 + 2);
drop(dictionary);
drop(rtxn); drop(rtxn);
} }
@ -1088,7 +1190,7 @@ mod tests {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let count = index.number_of_documents(&rtxn).unwrap(); let count = index.number_of_documents(&rtxn).unwrap();
assert_eq!(count, 6); assert_eq!(count, 6);
let count = index.all_documents(&rtxn).unwrap().count(); let count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(count, 6); assert_eq!(count, 6);
db_snap!(index, word_docids, "updated"); db_snap!(index, word_docids, "updated");
@ -1506,7 +1608,7 @@ mod tests {
index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let all_documents_count = index.all_documents(&rtxn).unwrap().count(); let all_documents_count = index.all_compressed_documents(&rtxn).unwrap().count();
assert_eq!(all_documents_count, 1); assert_eq!(all_documents_count, 1);
let external_documents_ids = index.external_documents_ids(); let external_documents_ids = index.external_documents_ids();
assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some());
@ -2796,7 +2898,7 @@ mod tests {
// Ensuring all the returned IDs actually exists // Ensuring all the returned IDs actually exists
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let res = index.search(&rtxn).execute().unwrap(); let res = index.search(&rtxn).execute().unwrap();
index.documents(&rtxn, res.documents_ids).unwrap(); index.compressed_documents(&rtxn, res.documents_ids).unwrap();
} }
fn delete_documents<'t>( fn delete_documents<'t>(
@ -3163,7 +3265,7 @@ mod tests {
let deleted_internal_ids = delete_documents(&mut wtxn, &index, &deleted_external_ids); let deleted_internal_ids = delete_documents(&mut wtxn, &index, &deleted_external_ids);
// list all documents // list all documents
let results = index.all_documents(&wtxn).unwrap(); let results = index.all_compressed_documents(&wtxn).unwrap();
for result in results { for result in results {
let (id, _) = result.unwrap(); let (id, _) = result.unwrap();
assert!( assert!(

View File

@ -168,10 +168,12 @@ impl<'a, 'i> Transform<'a, 'i> {
let external_documents_ids = self.index.external_documents_ids(); let external_documents_ids = self.index.external_documents_ids();
let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?;
let dictionary = self.index.document_decompression_dictionary(wtxn)?;
let primary_key = cursor.primary_key().to_string(); let primary_key = cursor.primary_key().to_string();
let primary_key_id = let primary_key_id =
self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?;
let mut decompression_buffer = Vec::new();
let mut obkv_buffer = Vec::new(); let mut obkv_buffer = Vec::new();
let mut document_sorter_value_buffer = Vec::new(); let mut document_sorter_value_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new();
@ -247,18 +249,17 @@ impl<'a, 'i> Transform<'a, 'i> {
let mut skip_insertion = false; let mut skip_insertion = false;
if let Some(original_docid) = original_docid { if let Some(original_docid) = original_docid {
let original_key = original_docid; let original_key = original_docid;
let base_obkv = self let base_compressed_obkv = self.index.documents.get(wtxn, &original_key)?.ok_or(
.index InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
.documents )?;
.remap_data_type::<heed::types::Bytes>()
.get(wtxn, &original_key)? let base_obkv = base_compressed_obkv.decompress_with_optional_dictionary(
.ok_or(InternalError::DatabaseMissingEntry { &mut decompression_buffer,
db_name: db_name::DOCUMENTS, dictionary.as_ref(),
key: None, )?;
})?;
// we check if the two documents are exactly equal. If it's the case we can skip this document entirely // we check if the two documents are exactly equal. If it's the case we can skip this document entirely
if base_obkv == obkv_buffer { if base_obkv.as_bytes() == obkv_buffer {
// we're not replacing anything // we're not replacing anything
self.replaced_documents_ids.remove(original_docid); self.replaced_documents_ids.remove(original_docid);
// and we need to put back the original id as it was before // and we need to put back the original id as it was before
@ -278,13 +279,12 @@ impl<'a, 'i> Transform<'a, 'i> {
document_sorter_value_buffer.clear(); document_sorter_value_buffer.clear();
document_sorter_value_buffer.push(Operation::Addition as u8); document_sorter_value_buffer.push(Operation::Addition as u8);
into_del_add_obkv( into_del_add_obkv(
KvReaderU16::new(base_obkv), base_obkv,
deladd_operation, deladd_operation,
&mut document_sorter_value_buffer, &mut document_sorter_value_buffer,
)?; )?;
self.original_sorter self.original_sorter
.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?;
let base_obkv = KvReader::new(base_obkv);
if let Some(flattened_obkv) = if let Some(flattened_obkv) =
Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)?
{ {
@ -348,9 +348,12 @@ impl<'a, 'i> Transform<'a, 'i> {
documents_seen: documents_count, documents_seen: documents_count,
}); });
drop(dictionary);
self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?;
self.index.put_primary_key(wtxn, &primary_key)?; self.index.put_primary_key(wtxn, &primary_key)?;
self.documents_count += documents_count; self.documents_count += documents_count;
// Now that we have a valid sorter that contains the user id and the obkv we // Now that we have a valid sorter that contains the user id and the obkv we
// give it to the last transforming function which returns the TransformOutput. // give it to the last transforming function which returns the TransformOutput.
Ok(documents_count) Ok(documents_count)
@ -1035,15 +1038,21 @@ impl<'a, 'i> Transform<'a, 'i> {
if original_sorter.is_some() || flattened_sorter.is_some() { if original_sorter.is_some() || flattened_sorter.is_some() {
let modified_faceted_fields = settings_diff.modified_faceted_fields(); let modified_faceted_fields = settings_diff.modified_faceted_fields();
let dictionary = self.index.document_decompression_dictionary(wtxn)?;
let mut original_obkv_buffer = Vec::new(); let mut original_obkv_buffer = Vec::new();
let mut flattened_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new();
let mut document_sorter_key_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new();
let mut buffer = Vec::new();
for result in self.index.external_documents_ids().iter(wtxn)? { for result in self.index.external_documents_ids().iter(wtxn)? {
let (external_id, docid) = result?; let (external_id, docid) = result?;
let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( let old_compressed_obkv = self.index.documents.get(wtxn, &docid)?.ok_or(
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
)?; )?;
let old_obkv = old_compressed_obkv
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?;
let injected_vectors: std::result::Result< let injected_vectors: std::result::Result<
serde_json::Map<String, serde_json::Value>, serde_json::Map<String, serde_json::Value>,
arroy::Error, arroy::Error,

View File

@ -19,6 +19,7 @@ use super::helpers::{
use super::MergeFn; use super::MergeFn;
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
use crate::facet::FacetType; use crate::facet::FacetType;
use crate::heed_codec::CompressedKvWriterU16;
use crate::index::db_name::DOCUMENTS; use crate::index::db_name::DOCUMENTS;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::proximity::MAX_DISTANCE; use crate::proximity::MAX_DISTANCE;
@ -162,6 +163,7 @@ pub(crate) fn write_typed_chunk_into_index(
.into_iter() .into_iter()
.map(|IndexEmbeddingConfig { name, .. }| name) .map(|IndexEmbeddingConfig { name, .. }| name)
.collect(); .collect();
let dictionary = index.document_compression_dictionary(wtxn)?;
let mut vectors_buffer = Vec::new(); let mut vectors_buffer = Vec::new();
while let Some((key, reader)) = iter.next()? { while let Some((key, reader)) = iter.next()? {
let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); let mut writer: KvWriter<_, FieldId> = KvWriter::memory();
@ -211,7 +213,17 @@ pub(crate) fn write_typed_chunk_into_index(
let db = index.documents.remap_data_type::<Bytes>(); let db = index.documents.remap_data_type::<Bytes>();
if !writer.is_empty() { if !writer.is_empty() {
db.put(wtxn, &docid, &writer.into_inner().unwrap())?; let uncompressed_document_bytes = writer.into_inner().unwrap();
match dictionary.as_ref() {
Some(dictionary) => {
let compressed = CompressedKvWriterU16::new_with_dictionary(
&uncompressed_document_bytes,
dictionary,
)?;
db.put(wtxn, &docid, compressed.as_bytes())?
}
None => db.put(wtxn, &docid, &uncompressed_document_bytes)?,
}
operations.push(DocumentOperation { operations.push(DocumentOperation {
external_id: external_id.to_string(), external_id: external_id.to_string(),
internal_id: docid, internal_id: docid,

View File

@ -1769,6 +1769,8 @@ mod tests {
// Check that the searchable field is correctly set to "name" only. // Check that the searchable field is correctly set to "name" only.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
// When we search for something that is not in // When we search for something that is not in
// the searchable fields it must not return any document. // the searchable fields it must not return any document.
let result = index.search(&rtxn).query("23").execute().unwrap(); let result = index.search(&rtxn).query("23").execute().unwrap();
@ -1777,10 +1779,17 @@ mod tests {
// When we search for something that is in the searchable fields // When we search for something that is in the searchable fields
// we must find the appropriate document. // we must find the appropriate document.
let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap(); let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap();
let documents = index.documents(&rtxn, result.documents_ids).unwrap(); let mut compressed_documents =
index.compressed_documents(&rtxn, result.documents_ids).unwrap();
let fid_map = index.fields_ids_map(&rtxn).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap();
assert_eq!(documents.len(), 1); assert_eq!(compressed_documents.len(), 1);
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); let (_id, compressed_document) = compressed_documents.remove(0);
let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
drop(dictionary);
drop(rtxn); drop(rtxn);
// We change the searchable fields to be the "name" field only. // We change the searchable fields to be the "name" field only.
@ -1805,6 +1814,7 @@ mod tests {
// Check that the searchable field have been reset and documents are found now. // Check that the searchable field have been reset and documents are found now.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let fid_map = index.fields_ids_map(&rtxn).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap();
let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap(); let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap();
snapshot!(format!("{user_defined_searchable_fields:?}"), @"None"); snapshot!(format!("{user_defined_searchable_fields:?}"), @"None");
@ -1813,8 +1823,13 @@ mod tests {
snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###); snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###);
let result = index.search(&rtxn).query("23").execute().unwrap(); let result = index.search(&rtxn).query("23").execute().unwrap();
assert_eq!(result.documents_ids.len(), 1); assert_eq!(result.documents_ids.len(), 1);
let documents = index.documents(&rtxn, result.documents_ids).unwrap(); let mut compressed_documents =
assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); index.compressed_documents(&rtxn, result.documents_ids).unwrap();
let (_id, compressed_document) = compressed_documents.remove(0);
let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..]));
} }
#[test] #[test]
@ -1949,15 +1964,20 @@ mod tests {
// Check that the displayed fields are correctly set. // Check that the displayed fields are correctly set.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let mut buffer = Vec::new();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let fields_ids = index.filterable_fields(&rtxn).unwrap(); let fields_ids = index.filterable_fields(&rtxn).unwrap();
assert_eq!(fields_ids, hashset! { S("age") }); assert_eq!(fields_ids, hashset! { S("age") });
// Only count the field_id 0 and level 0 facet values. // Only count the field_id 0 and level 0 facet values.
// TODO we must support typed CSVs for numbers to be understood. // TODO we must support typed CSVs for numbers to be understood.
let fidmap = index.fields_ids_map(&rtxn).unwrap(); let fidmap = index.fields_ids_map(&rtxn).unwrap();
for document in index.all_documents(&rtxn).unwrap() { for result in index.all_compressed_documents(&rtxn).unwrap() {
let document = document.unwrap(); let (_id, compressed_document) = result.unwrap();
let json = crate::obkv_to_json(&fidmap.ids().collect::<Vec<_>>(), &fidmap, document.1) let document = compressed_document
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap(); .unwrap();
let json =
crate::obkv_to_json(&fidmap.ids().collect::<Vec<_>>(), &fidmap, document).unwrap();
println!("json: {:?}", json); println!("json: {:?}", json);
} }
let count = index let count = index
@ -1968,6 +1988,7 @@ mod tests {
.unwrap() .unwrap()
.count(); .count();
assert_eq!(count, 3); assert_eq!(count, 3);
drop(dictionary);
drop(rtxn); drop(rtxn);
// Index a little more documents with new and current facets values. // Index a little more documents with new and current facets values.
@ -2057,6 +2078,7 @@ mod tests {
#[test] #[test]
fn set_asc_desc_field() { fn set_asc_desc_field() {
let mut index = TempIndex::new(); let mut index = TempIndex::new();
let mut buffer = Vec::new();
index.index_documents_config.autogenerate_docids = true; index.index_documents_config.autogenerate_docids = true;
// Set the filterable fields to be the age. // Set the filterable fields to be the age.
@ -2078,12 +2100,16 @@ mod tests {
// Run an empty query just to ensure that the search results are ordered. // Run an empty query just to ensure that the search results are ordered.
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap();
let documents = index.documents(&rtxn, documents_ids).unwrap(); let compressed_documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
// Fetch the documents "age" field in the ordre in which the documents appear. // Fetch the documents "age" field in the ordre in which the documents appear.
let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap(); let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap();
let iter = documents.into_iter().map(|(_, doc)| { let iter = compressed_documents.into_iter().map(|(_, compressed_doc)| {
let doc = compressed_doc
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let bytes = doc.get(age_field_id).unwrap(); let bytes = doc.get(age_field_id).unwrap();
let string = std::str::from_utf8(bytes).unwrap(); let string = std::str::from_utf8(bytes).unwrap();
string.parse::<u32>().unwrap() string.parse::<u32>().unwrap()
@ -2480,6 +2506,7 @@ mod tests {
#[test] #[test]
fn setting_impact_relevancy() { fn setting_impact_relevancy() {
let mut index = TempIndex::new(); let mut index = TempIndex::new();
let mut buffer = Vec::new();
index.index_documents_config.autogenerate_docids = true; index.index_documents_config.autogenerate_docids = true;
// Set the genres setting // Set the genres setting
@ -2512,8 +2539,12 @@ mod tests {
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap();
let first_id = documents_ids[0]; let first_id = documents_ids[0];
let documents = index.documents(&rtxn, documents_ids).unwrap(); let documents = index.compressed_documents(&rtxn, documents_ids).unwrap();
let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); let (_, compressed_content) = documents.iter().find(|(id, _)| *id == first_id).unwrap();
let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let content = compressed_content
.decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())
.unwrap();
let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap(); let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap();
let line = std::str::from_utf8(content.get(fid).unwrap()).unwrap(); let line = std::str::from_utf8(content.get(fid).unwrap()).unwrap();
@ -2681,7 +2712,7 @@ mod tests {
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.write_txn().unwrap(); let rtxn = index.write_txn().unwrap();
let docs: StdResult<Vec<_>, _> = index.all_documents(&rtxn).unwrap().collect(); let docs: StdResult<Vec<_>, _> = index.all_compressed_documents(&rtxn).unwrap().collect();
let docs = docs.unwrap(); let docs = docs.unwrap();
assert_eq!(docs.len(), 5); assert_eq!(docs.len(), 5);
} }

View File

@ -317,7 +317,20 @@ fn criteria_ascdesc() {
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
let documents = index.all_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::<Vec<_>>(); let dictionary = index.document_decompression_dictionary(&rtxn).unwrap();
let mut buffers = vec![Vec::new(); index.number_of_documents(&rtxn).unwrap() as usize];
let documents = index
.all_compressed_documents(&rtxn)
.unwrap()
.zip(buffers.iter_mut())
.map(|(compressed, buffer)| {
let (id, compressed) = compressed.unwrap();
let doc = compressed
.decompress_with_optional_dictionary(buffer, dictionary.as_ref())
.unwrap();
(id, doc)
})
.collect::<Vec<_>>();
for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] { for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] {
eprintln!("Testing with criterion: {:?}", &criterion); eprintln!("Testing with criterion: {:?}", &criterion);