Compare commits

..

8 Commits

Author SHA1 Message Date
ManyTheFish
934b73142d reduce the number of computed prefix 2025-03-27 17:57:57 +01:00
Clément Renault
bb2e9419d3 Merge pull request #5468 from meilisearch/more-precise-post-processing
More Precise Post Processing
2025-03-27 10:07:09 +00:00
Clément Renault
cf68713145 Merge pull request #5465 from meilisearch/improve-stats-perf
Improve documents stats performances
2025-03-27 09:20:14 +00:00
Kerollmops
811143cbe9 Add more progress precision when doing post processing 2025-03-27 10:17:28 +01:00
Kerollmops
c670e9a39b Make sure the snaps are happy 2025-03-26 20:03:35 +01:00
Clément Renault
65f1b13475 Merge pull request #5464 from meilisearch/camel-case-database-sizes
Prefer camelCase for internal database sizes db name
2025-03-26 16:40:39 +00:00
Kerollmops
db7ce03763 Improve the performances of computing the size of the documents database 2025-03-26 17:40:12 +01:00
Kerollmops
7ed9adde29 Prefer camelCase for internal database sizes db name 2025-03-26 16:45:52 +01:00
15 changed files with 365 additions and 213 deletions

View File

@@ -20,6 +20,7 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Arc;
use convert_case::{Case, Casing as _};
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli;
@@ -381,7 +382,10 @@ impl IndexScheduler {
Less => "-",
};
Some((dbname.to_string(), format!("{post:#.2} ({sign}{diff:#.2})").into()))
Some((
dbname.to_case(Case::Camel),
format!("{post:#.2} ({sign}{diff:#.2})").into(),
))
})
.into_iter()
.flatten()

View File

@@ -518,7 +518,7 @@ impl From<index_scheduler::IndexStats> for IndexStats {
.inner_stats
.number_of_documents
.unwrap_or(stats.inner_stats.documents_database_stats.number_of_entries()),
raw_document_db_size: stats.inner_stats.documents_database_stats.total_value_size(),
raw_document_db_size: stats.inner_stats.documents_database_stats.total_size(),
avg_document_size: stats.inner_stats.documents_database_stats.average_value_size(),
is_indexing: stats.is_indexing,
number_of_embeddings: stats.inner_stats.number_of_embeddings,

View File

@@ -157,11 +157,14 @@ async fn delete_document_by_filter() {
index.wait_task(task.uid()).await.succeeded();
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 4,
"rawDocumentDbSize": 42,
"avgDocumentSize": 10,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -208,11 +211,14 @@ async fn delete_document_by_filter() {
"###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 16,
"avgDocumentSize": 8,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -278,11 +284,14 @@ async fn delete_document_by_filter() {
"###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 1,
"rawDocumentDbSize": 12,
"avgDocumentSize": 12,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,

View File

@@ -28,12 +28,15 @@ async fn import_dump_v1_movie_raw() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -185,12 +188,15 @@ async fn import_dump_v1_movie_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -355,12 +361,15 @@ async fn import_dump_v1_rubygems_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 8606,
"avgDocumentSize": 162,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -522,12 +531,15 @@ async fn import_dump_v2_movie_raw() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -679,12 +691,15 @@ async fn import_dump_v2_movie_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -846,12 +861,15 @@ async fn import_dump_v2_rubygems_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 8606,
"avgDocumentSize": 162,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1010,12 +1028,15 @@ async fn import_dump_v3_movie_raw() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1167,12 +1188,15 @@ async fn import_dump_v3_movie_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1334,12 +1358,15 @@ async fn import_dump_v3_rubygems_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 8606,
"avgDocumentSize": 162,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1498,12 +1525,15 @@ async fn import_dump_v4_movie_raw() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1655,12 +1685,15 @@ async fn import_dump_v4_movie_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 21965,
"avgDocumentSize": 414,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1822,12 +1855,15 @@ async fn import_dump_v4_rubygems_with_settings() {
let (stats, code) = index.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 53,
"rawDocumentDbSize": 8606,
"avgDocumentSize": 162,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -1994,11 +2030,14 @@ async fn import_dump_v5() {
let (stats, code) = index1.stats().await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 10,
"rawDocumentDbSize": 6782,
"avgDocumentSize": 678,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -2031,12 +2070,15 @@ async fn import_dump_v5() {
let (stats, code) = index2.stats().await;
snapshot!(code, @"200 OK");
snapshot!(
json_string!(stats),
json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}),
@r###"
{
"numberOfDocuments": 10,
"rawDocumentDbSize": 6782,
"avgDocumentSize": 678,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,

View File

@@ -110,11 +110,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 27,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 5,
"numberOfEmbeddedDocuments": 2,
@@ -135,11 +138,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 27,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 2,
@@ -160,11 +166,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 27,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 2,
"numberOfEmbeddedDocuments": 2,
@@ -186,11 +195,14 @@ async fn add_remove_embeddings() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 27,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 2,
"numberOfEmbeddedDocuments": 1,
@@ -236,11 +248,14 @@ async fn add_remove_embedded_documents() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 27,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 5,
"numberOfEmbeddedDocuments": 2,
@@ -257,11 +272,14 @@ async fn add_remove_embedded_documents() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 1,
"rawDocumentDbSize": 13,
"avgDocumentSize": 13,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 1,
@@ -290,11 +308,14 @@ async fn update_embedder_settings() {
index.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 108,
"avgDocumentSize": 54,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -326,11 +347,14 @@ async fn update_embedder_settings() {
server.wait_task(response.uid()).await.succeeded();
let (stats, _code) = index.stats().await;
snapshot!(json_string!(stats), @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[size]",
".avgDocumentSize" => "[size]",
}), @r###"
{
"numberOfDocuments": 2,
"rawDocumentDbSize": 108,
"avgDocumentSize": 54,
"rawDocumentDbSize": "[size]",
"avgDocumentSize": "[size]",
"isIndexing": false,
"numberOfEmbeddings": 3,
"numberOfEmbeddedDocuments": 2,

View File

@@ -133,7 +133,9 @@ async fn check_the_index_scheduler(server: &Server) {
let (stats, _) = server.stats().await;
assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]"
".usedDatabaseSize" => "[bytes]",
".indexes.kefir.rawDocumentDbSize" => "[bytes]",
".indexes.kefir.avgDocumentSize" => "[bytes]",
},
@r###"
{
@@ -143,8 +145,8 @@ async fn check_the_index_scheduler(server: &Server) {
"indexes": {
"kefir": {
"numberOfDocuments": 1,
"rawDocumentDbSize": 109,
"avgDocumentSize": 109,
"rawDocumentDbSize": "[bytes]",
"avgDocumentSize": "[bytes]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -217,7 +219,9 @@ async fn check_the_index_scheduler(server: &Server) {
let (stats, _) = server.stats().await;
assert_json_snapshot!(stats, {
".databaseSize" => "[bytes]",
".usedDatabaseSize" => "[bytes]"
".usedDatabaseSize" => "[bytes]",
".indexes.kefir.rawDocumentDbSize" => "[bytes]",
".indexes.kefir.avgDocumentSize" => "[bytes]",
},
@r###"
{
@@ -227,8 +231,8 @@ async fn check_the_index_scheduler(server: &Server) {
"indexes": {
"kefir": {
"numberOfDocuments": 1,
"rawDocumentDbSize": 109,
"avgDocumentSize": 109,
"rawDocumentDbSize": "[bytes]",
"avgDocumentSize": "[bytes]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,
@@ -245,11 +249,14 @@ async fn check_the_index_scheduler(server: &Server) {
"###);
let index = server.index("kefir");
let (stats, _) = index.stats().await;
snapshot!(stats, @r###"
snapshot!(json_string!(stats, {
".rawDocumentDbSize" => "[bytes]",
".avgDocumentSize" => "[bytes]",
}), @r###"
{
"numberOfDocuments": 1,
"rawDocumentDbSize": 109,
"avgDocumentSize": 109,
"rawDocumentDbSize": "[bytes]",
"avgDocumentSize": "[bytes]",
"isIndexing": false,
"numberOfEmbeddings": 0,
"numberOfEmbeddedDocuments": 0,

View File

@@ -1,8 +1,13 @@
use heed::types::Bytes;
use std::mem;
use heed::Database;
use heed::DatabaseStat;
use heed::RoTxn;
use heed::Unspecified;
use serde::{Deserialize, Serialize};
use crate::BEU32;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
/// The stats of a database.
@@ -20,58 +25,24 @@ impl DatabaseStats {
///
/// This function iterates over the whole database and computes the stats.
/// It is not efficient and should be cached somewhere.
pub(crate) fn new(database: Database<Bytes, Bytes>, rtxn: &RoTxn<'_>) -> heed::Result<Self> {
let mut database_stats =
Self { number_of_entries: 0, total_key_size: 0, total_value_size: 0 };
pub(crate) fn new(
database: Database<BEU32, Unspecified>,
rtxn: &RoTxn<'_>,
) -> heed::Result<Self> {
let DatabaseStat { page_size, depth: _, branch_pages, leaf_pages, overflow_pages, entries } =
database.stat(rtxn)?;
let mut iter = database.iter(rtxn)?;
while let Some((key, value)) = iter.next().transpose()? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
database_stats.total_key_size += key_size;
database_stats.total_value_size += value_size;
}
// We first take the total size without overflow pages as the overflow pages contains the values and only that.
let total_size = (branch_pages + leaf_pages + overflow_pages) * page_size as usize;
// We compute an estimated size for the keys.
let total_key_size = entries * (mem::size_of::<u32>() + 4);
let total_value_size = total_size - total_key_size;
database_stats.number_of_entries = database.len(rtxn)?;
Ok(database_stats)
}
/// Recomputes the stats of the database and returns the new stats.
///
/// This function is used to update the stats of the database when some keys are modified.
/// It is more efficient than the `new` function because it does not iterate over the whole database but only the modified keys comparing the before and after states.
pub(crate) fn recompute<I, K>(
mut stats: Self,
database: Database<Bytes, Bytes>,
before_rtxn: &RoTxn<'_>,
after_rtxn: &RoTxn<'_>,
modified_keys: I,
) -> heed::Result<Self>
where
I: IntoIterator<Item = K>,
K: AsRef<[u8]>,
{
for key in modified_keys {
let key = key.as_ref();
if let Some(value) = database.get(after_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_add(key_size);
stats.total_value_size = stats.total_value_size.saturating_add(value_size);
}
if let Some(value) = database.get(before_rtxn, key)? {
let key_size = key.len() as u64;
let value_size = value.len() as u64;
stats.total_key_size = stats.total_key_size.saturating_sub(key_size);
stats.total_value_size = stats.total_value_size.saturating_sub(value_size);
}
}
stats.number_of_entries = database.len(after_rtxn)?;
Ok(stats)
Ok(Self {
number_of_entries: entries as u64,
total_key_size: total_key_size as u64,
total_value_size: total_value_size as u64,
})
}
pub fn average_key_size(&self) -> u64 {
@@ -86,6 +57,10 @@ impl DatabaseStats {
self.number_of_entries
}
pub fn total_size(&self) -> u64 {
self.total_key_size + self.total_value_size
}
pub fn total_key_size(&self) -> u64 {
self.total_key_size
}

View File

@@ -411,38 +411,6 @@ impl Index {
Ok(count.unwrap_or_default())
}
/// Updates the stats of the documents database based on the previous stats and the modified docids.
pub fn update_documents_stats(
&self,
wtxn: &mut RwTxn<'_>,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> {
let before_rtxn = self.read_txn()?;
let document_stats = match self.documents_stats(&before_rtxn)? {
Some(before_stats) => DatabaseStats::recompute(
before_stats,
self.documents.remap_types(),
&before_rtxn,
wtxn,
modified_docids.iter().map(|docid| docid.to_be_bytes()),
)?,
None => {
// This should never happen when there are already documents in the index, the documents stats should be present.
// If it happens, it means that the index was not properly initialized/upgraded.
debug_assert_eq!(
self.documents.len(&before_rtxn)?,
0,
"The documents stats should be present when there are documents in the index"
);
tracing::warn!("No documents stats found, creating new ones");
DatabaseStats::new(self.documents.remap_types(), &*wtxn)?
}
};
self.put_documents_stats(wtxn, document_stats)?;
Ok(())
}
/// Writes the stats of the documents database.
pub fn put_documents_stats(
&self,

View File

@@ -37,12 +37,12 @@ pub struct DatabaseCache<'ctx> {
pub words_fst: Option<fst::Set<Cow<'ctx, [u8]>>>,
pub word_position_docids: FxHashMap<(Interned<String>, u16), Option<Cow<'ctx, [u8]>>>,
pub word_prefix_position_docids: FxHashMap<(Interned<String>, u16), Option<Cow<'ctx, [u8]>>>,
pub word_prefix_position_docids: FxHashMap<(Interned<String>, u16), Option<RoaringBitmap>>,
pub word_positions: FxHashMap<Interned<String>, Vec<u16>>,
pub word_prefix_positions: FxHashMap<Interned<String>, Vec<u16>>,
pub word_fid_docids: FxHashMap<(Interned<String>, u16), Option<Cow<'ctx, [u8]>>>,
pub word_prefix_fid_docids: FxHashMap<(Interned<String>, u16), Option<Cow<'ctx, [u8]>>>,
pub word_prefix_fid_docids: FxHashMap<(Interned<String>, u16), Option<RoaringBitmap>>,
pub word_fids: FxHashMap<Interned<String>, Vec<u16>>,
pub word_prefix_fids: FxHashMap<Interned<String>, Vec<u16>>,
}
@@ -562,14 +562,46 @@ impl<'ctx> SearchContext<'ctx> {
return Ok(None);
}
DatabaseCache::get_value(
self.txn,
(word_prefix, fid),
&(self.word_interner.get(word_prefix).as_str(), fid),
&mut self.db_cache.word_prefix_fid_docids,
universe,
self.index.word_prefix_fid_docids.remap_data_type::<Bytes>(),
)
let cache = &mut self.db_cache.word_prefix_fid_docids;
let prefix_db = &self.index.word_prefix_fid_docids;
let db = &self.index.word_fid_docids;
if let Entry::Vacant(entry) = cache.entry((word_prefix, fid)) {
let word_prefix_bytes = self.word_interner.get(word_prefix).as_bytes().to_owned();
let word_prefix_str = std::str::from_utf8(&word_prefix_bytes).unwrap();
match prefix_db.get(self.txn, &(word_prefix_str, fid))? {
Some(mut bitmap) => {
if let Some(universe) = universe {
bitmap &= universe;
}
entry.insert(Some(bitmap));
}
None => {
let mut key = word_prefix_bytes.clone();
key.push(0);
let remap_key_type = db
.remap_key_type::<Bytes>()
.prefix_iter(self.txn, &key)?
.remap_key_type::<StrBEU16Codec>();
let mut bitmap = RoaringBitmap::new();
for result in remap_key_type {
let ((_, pos), value) = result?;
if pos == fid {
if let Some(universe) = universe {
bitmap |= value & universe;
} else {
bitmap |= value;
}
}
}
entry.insert(Some(bitmap));
}
}
}
Ok(cache.get(&(word_prefix, fid)).unwrap().clone())
}
pub fn get_db_word_fids(&mut self, word: Interned<String>) -> Result<Vec<u16>> {
@@ -605,6 +637,7 @@ impl<'ctx> SearchContext<'ctx> {
let mut key = self.word_interner.get(word_prefix).as_bytes().to_owned();
key.push(0);
let mut fids = vec![];
// TODO: This is no more exhaustive, we should iterate over all fids.
let remap_key_type = self
.index
.word_prefix_fid_docids
@@ -612,11 +645,7 @@ impl<'ctx> SearchContext<'ctx> {
.prefix_iter(self.txn, &key)?
.remap_key_type::<StrBEU16Codec>();
for result in remap_key_type {
let ((_, fid), value) = result?;
// filling other caches to avoid searching for them again
self.db_cache
.word_prefix_fid_docids
.insert((word_prefix, fid), Some(Cow::Borrowed(value)));
let ((_, fid), _value) = result?;
fids.push(fid);
}
entry.insert(fids.clone());
@@ -648,14 +677,46 @@ impl<'ctx> SearchContext<'ctx> {
word_prefix: Interned<String>,
position: u16,
) -> Result<Option<RoaringBitmap>> {
DatabaseCache::get_value(
self.txn,
(word_prefix, position),
&(self.word_interner.get(word_prefix).as_str(), position),
&mut self.db_cache.word_prefix_position_docids,
universe,
self.index.word_prefix_position_docids.remap_data_type::<Bytes>(),
)
let cache = &mut self.db_cache.word_prefix_position_docids;
let prefix_db = &self.index.word_prefix_position_docids;
let db = &self.index.word_position_docids;
if let Entry::Vacant(entry) = cache.entry((word_prefix, position)) {
let word_prefix_bytes = self.word_interner.get(word_prefix).as_bytes().to_owned();
let word_prefix_str = std::str::from_utf8(&word_prefix_bytes).unwrap();
match prefix_db.get(self.txn, &(word_prefix_str, position))? {
Some(mut bitmap) => {
if let Some(universe) = universe {
bitmap &= universe;
}
entry.insert(Some(bitmap));
}
None => {
let mut key = word_prefix_bytes.clone();
key.push(0);
let remap_key_type = db
.remap_key_type::<Bytes>()
.prefix_iter(self.txn, &key)?
.remap_key_type::<StrBEU16Codec>();
let mut bitmap = RoaringBitmap::new();
for result in remap_key_type {
let ((_, pos), value) = result?;
if pos == position {
if let Some(universe) = universe {
bitmap |= value & universe;
} else {
bitmap |= value;
}
}
}
entry.insert(Some(bitmap));
}
}
}
Ok(cache.get(&(word_prefix, position)).unwrap().clone())
}
pub fn get_db_word_positions(&mut self, word: Interned<String>) -> Result<Vec<u16>> {
@@ -696,6 +757,7 @@ impl<'ctx> SearchContext<'ctx> {
let mut key = self.word_interner.get(word_prefix).as_bytes().to_owned();
key.push(0);
let mut positions = vec![];
// TODO: This is no more exhaustive, we should iterate over all positions.
let remap_key_type = self
.index
.word_prefix_position_docids
@@ -703,11 +765,7 @@ impl<'ctx> SearchContext<'ctx> {
.prefix_iter(self.txn, &key)?
.remap_key_type::<StrBEU16Codec>();
for result in remap_key_type {
let ((_, position), value) = result?;
// filling other caches to avoid searching for them again
self.db_cache
.word_prefix_position_docids
.insert((word_prefix, position), Some(Cow::Borrowed(value)));
let ((_, position), _value) = result?;
positions.push(position);
}
entry.insert(positions.clone());

View File

@@ -28,6 +28,7 @@ pub use self::helpers::*;
pub use self::transform::{Transform, TransformOutput};
use super::facet::clear_facet_levels_based_on_settings_diff;
use super::new::StdResult;
use crate::database_stats::DatabaseStats;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings};
@@ -476,7 +477,8 @@ where
if !settings_diff.settings_update_only {
// Update the stats of the documents database when there is a document update.
self.index.update_documents_stats(self.wtxn, modified_docids)?;
let stats = DatabaseStats::new(self.index.documents.remap_data_type(), self.wtxn)?;
self.index.put_documents_stats(self.wtxn, stats)?;
}
// We write the field distribution into the main database
self.index.put_field_distribution(self.wtxn, &field_distribution)?;

View File

@@ -234,7 +234,6 @@ where
embedders,
field_distribution,
document_ids,
modified_docids,
)?;
Ok(congestion)

View File

@@ -7,12 +7,13 @@ use itertools::{merge_join_by, EitherOrBoth};
use super::document_changes::IndexingContext;
use crate::facet::FacetType;
use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
use crate::progress::Progress;
use crate::update::del_add::DelAdd;
use crate::update::facet::new_incremental::FacetsUpdateIncremental;
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MAX_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
use crate::update::new::facet_search_builder::FacetSearchBuilder;
use crate::update::new::merger::FacetFieldIdDelta;
use crate::update::new::steps::IndexingStep;
use crate::update::new::steps::{IndexingStep, PostProcessingFacets, PostProcessingWords};
use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder};
use crate::update::new::words_prefix_docids::{
compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids,
@@ -33,11 +34,23 @@ where
{
let index = indexing_context.index;
indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets);
compute_facet_level_database(index, wtxn, facet_field_ids_delta, &mut global_fields_ids_map)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map)?;
compute_facet_level_database(
index,
wtxn,
facet_field_ids_delta,
&mut global_fields_ids_map,
indexing_context.progress,
)?;
compute_facet_search_database(index, wtxn, global_fields_ids_map, indexing_context.progress)?;
indexing_context.progress.update_progress(IndexingStep::PostProcessingWords);
if let Some(prefix_delta) = compute_word_fst(index, wtxn)? {
compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?;
if let Some(prefix_delta) = compute_word_fst(index, wtxn, indexing_context.progress)? {
compute_prefix_database(
index,
wtxn,
prefix_delta,
indexing_context.grenad_parameters,
indexing_context.progress,
)?;
};
Ok(())
}
@@ -48,21 +61,32 @@ fn compute_prefix_database(
wtxn: &mut RwTxn,
prefix_delta: PrefixDelta,
grenad_parameters: &GrenadParameters,
progress: &Progress,
) -> Result<()> {
let PrefixDelta { modified, deleted } = prefix_delta;
// Compute word prefix docids
progress.update_progress(PostProcessingWords::WordPrefixDocids);
compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute exact word prefix docids
progress.update_progress(PostProcessingWords::ExactWordPrefixDocids);
compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix fid docids
progress.update_progress(PostProcessingWords::WordPrefixFieldIdDocids);
compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?;
// Compute word prefix position docids
progress.update_progress(PostProcessingWords::WordPrefixPositionDocids);
compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters)
}
#[tracing::instrument(level = "trace", skip_all, target = "indexing")]
fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result<Option<PrefixDelta>> {
fn compute_word_fst(
index: &Index,
wtxn: &mut RwTxn,
progress: &Progress,
) -> Result<Option<PrefixDelta>> {
let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingWords::WordFst);
let words_fst = index.words_fst(&rtxn)?;
let mut word_fst_builder = WordFstBuilder::new(&words_fst)?;
let prefix_settings = index.prefix_settings(&rtxn)?;
@@ -112,8 +136,10 @@ fn compute_facet_search_database(
index: &Index,
wtxn: &mut RwTxn,
global_fields_ids_map: GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> {
let rtxn = index.read_txn()?;
progress.update_progress(PostProcessingFacets::FacetSearch);
// if the facet search is not enabled, we can skip the rest of the function
if !index.facet_search(wtxn)? {
@@ -171,10 +197,16 @@ fn compute_facet_level_database(
wtxn: &mut RwTxn,
mut facet_field_ids_delta: FacetFieldIdsDelta,
global_fields_ids_map: &mut GlobalFieldsIdsMap,
progress: &Progress,
) -> Result<()> {
let rtxn = index.read_txn()?;
let filterable_attributes_rules = index.filterable_attributes_rules(&rtxn)?;
for (fid, delta) in facet_field_ids_delta.consume_facet_string_delta() {
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_string_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
// skip field ids that should not be facet leveled
let Some(metadata) = global_fields_ids_map.metadata(fid) else {
continue;
@@ -187,11 +219,13 @@ fn compute_facet_level_database(
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::StringsBulk);
tracing::debug!(%fid, "bulk string facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::StringsIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental string facet processing");
FacetsUpdateIncremental::new(
index,
@@ -207,16 +241,22 @@ fn compute_facet_level_database(
}
}
for (fid, delta) in facet_field_ids_delta.consume_facet_number_delta() {
let mut deltas: Vec<_> = facet_field_ids_delta.consume_facet_number_delta().collect();
// We move all bulks at the front and incrementals (others) at the end.
deltas.sort_by_key(|(_, delta)| if let FacetFieldIdDelta::Bulk = delta { 0 } else { 1 });
for (fid, delta) in deltas {
let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number");
let _entered = span.enter();
match delta {
FacetFieldIdDelta::Bulk => {
progress.update_progress(PostProcessingFacets::NumbersBulk);
tracing::debug!(%fid, "bulk number facet processing");
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::Number)
.execute(wtxn)?
}
FacetFieldIdDelta::Incremental(delta_data) => {
progress.update_progress(PostProcessingFacets::NumbersIncremental);
tracing::debug!(%fid, len=%delta_data.len(), "incremental number facet processing");
FacetsUpdateIncremental::new(
index,

View File

@@ -7,6 +7,7 @@ use rand::SeedableRng as _;
use time::OffsetDateTime;
use super::super::channel::*;
use crate::database_stats::DatabaseStats;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig;
@@ -142,7 +143,6 @@ pub(super) fn update_index(
embedders: EmbeddingConfigs,
field_distribution: std::collections::BTreeMap<String, u64>,
document_ids: roaring::RoaringBitmap,
modified_docids: roaring::RoaringBitmap,
) -> Result<()> {
index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?;
if let Some(new_primary_key) = new_primary_key {
@@ -153,7 +153,8 @@ pub(super) fn update_index(
index.put_field_distribution(wtxn, &field_distribution)?;
index.put_documents_ids(wtxn, &document_ids)?;
index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?;
index.update_documents_stats(wtxn, modified_docids)?;
let stats = DatabaseStats::new(index.documents.remap_data_type(), wtxn)?;
index.put_documents_stats(wtxn, stats)?;
Ok(())
}

View File

@@ -20,3 +20,23 @@ make_enum_progress! {
Finalizing,
}
}
make_enum_progress! {
pub enum PostProcessingFacets {
StringsBulk,
StringsIncremental,
NumbersBulk,
NumbersIncremental,
FacetSearch,
}
}
make_enum_progress! {
pub enum PostProcessingWords {
WordFst,
WordPrefixDocids,
ExactWordPrefixDocids,
WordPrefixFieldIdDocids,
WordPrefixPositionDocids,
}
}

View File

@@ -291,6 +291,9 @@ impl<'a, 'rtxn> FrozenPrefixIntegerBitmaps<'a, 'rtxn> {
let (_word, pos) = StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?;
positions.entry(pos).or_insert_with(Vec::new).push(bytes);
}
// We remove all the positions that have less than 100 bitmaps.
positions.retain(|_, bitmaps| bitmaps.len() > 100);
assert!(prefixes_bitmaps.insert(prefix.as_str(), positions).is_none());
}