mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 09:56:28 +00:00 
			
		
		
		
	write stats after rebuilding facet distribution
This commit is contained in:
		@@ -10,11 +10,15 @@ use anyhow::Context;
 | 
			
		||||
use file_store::FileStore;
 | 
			
		||||
use indexmap::IndexMap;
 | 
			
		||||
use meilisearch_types::milli::documents::DocumentsBatchReader;
 | 
			
		||||
use milli::heed::types::Str;
 | 
			
		||||
use milli::heed::{Database, EnvOpenOptions};
 | 
			
		||||
use milli::heed::types::{SerdeJson, Str};
 | 
			
		||||
use milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
 | 
			
		||||
use milli::progress::Step;
 | 
			
		||||
use milli::{FieldDistribution, Index};
 | 
			
		||||
use serde::Serialize;
 | 
			
		||||
use serde_json::value::RawValue;
 | 
			
		||||
use tempfile::NamedTempFile;
 | 
			
		||||
use time::OffsetDateTime;
 | 
			
		||||
use uuid::Uuid;
 | 
			
		||||
 | 
			
		||||
use crate::try_opening_database;
 | 
			
		||||
use crate::uuid_codec::UuidCodec;
 | 
			
		||||
@@ -100,20 +104,30 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
 | 
			
		||||
    let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
 | 
			
		||||
        .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
 | 
			
		||||
 | 
			
		||||
    let sched_rtxn = env.read_txn()?;
 | 
			
		||||
    let mut sched_wtxn = env.write_txn()?;
 | 
			
		||||
 | 
			
		||||
    let index_mapping: Database<Str, UuidCodec> =
 | 
			
		||||
        try_opening_database(&env, &sched_rtxn, "index-mapping")?;
 | 
			
		||||
        try_opening_database(&env, &sched_wtxn, "index-mapping")?;
 | 
			
		||||
    let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
 | 
			
		||||
        try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
 | 
			
		||||
            format!("While trying to open {:?}", index_scheduler_path.display())
 | 
			
		||||
        })?;
 | 
			
		||||
 | 
			
		||||
    let index_count =
 | 
			
		||||
        index_mapping.len(&sched_rtxn).context("while reading the number of indexes")?;
 | 
			
		||||
        index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
 | 
			
		||||
 | 
			
		||||
    // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
 | 
			
		||||
    // 1. immutably for the iteration
 | 
			
		||||
    // 2. mutably for updating index stats
 | 
			
		||||
    let indexes: Vec<_> = index_mapping
 | 
			
		||||
        .iter(&sched_wtxn)?
 | 
			
		||||
        .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
 | 
			
		||||
        .collect();
 | 
			
		||||
 | 
			
		||||
    let progress = milli::progress::Progress::default();
 | 
			
		||||
    let finished = AtomicBool::new(false);
 | 
			
		||||
 | 
			
		||||
    std::thread::scope(|scope| {
 | 
			
		||||
        let indexes = index_mapping.iter(&sched_rtxn)?;
 | 
			
		||||
 | 
			
		||||
        let display_progress = std::thread::Builder::new()
 | 
			
		||||
            .name("display_progress".into())
 | 
			
		||||
            .spawn_scoped(scope, || {
 | 
			
		||||
@@ -128,10 +142,10 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
 | 
			
		||||
            })
 | 
			
		||||
            .unwrap();
 | 
			
		||||
 | 
			
		||||
        for (index_index, result) in indexes.enumerate() {
 | 
			
		||||
        for (index_index, result) in indexes.into_iter().enumerate() {
 | 
			
		||||
            let (uid, uuid) = result?;
 | 
			
		||||
            progress.update_progress(VariableNameStep::new(
 | 
			
		||||
                uid,
 | 
			
		||||
                &uid,
 | 
			
		||||
                index_index as u32,
 | 
			
		||||
                index_count as u32,
 | 
			
		||||
            ));
 | 
			
		||||
@@ -155,10 +169,14 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
 | 
			
		||||
            milli::update::new::reindex::field_distribution(&index, &mut index_txn, &progress)
 | 
			
		||||
                .context("while rebuilding field distribution")?;
 | 
			
		||||
 | 
			
		||||
            let stats = IndexStats::new(&index, &index_txn)
 | 
			
		||||
                .with_context(|| format!("computing stats for index `{uid}`"))?;
 | 
			
		||||
            store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
 | 
			
		||||
 | 
			
		||||
            index_txn.commit().context("while committing the write txn for the updated index")?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        sched_rtxn.commit().context("while committing the write txn for the index-scheduler")?;
 | 
			
		||||
        sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
 | 
			
		||||
 | 
			
		||||
        finished.store(true, std::sync::atomic::Ordering::Relaxed);
 | 
			
		||||
 | 
			
		||||
@@ -203,3 +221,60 @@ impl Step for VariableNameStep {
 | 
			
		||||
        self.total
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub fn store_stats_of(
 | 
			
		||||
    stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
 | 
			
		||||
    index_uuid: Uuid,
 | 
			
		||||
    sched_wtxn: &mut RwTxn,
 | 
			
		||||
    index_uid: &str,
 | 
			
		||||
    stats: &IndexStats,
 | 
			
		||||
) -> anyhow::Result<()> {
 | 
			
		||||
    stats_db
 | 
			
		||||
        .put(sched_wtxn, &index_uuid, stats)
 | 
			
		||||
        .with_context(|| format!("storing stats for index `{index_uid}`"))?;
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// The statistics that can be computed from an `Index` object.
 | 
			
		||||
#[derive(Serialize, Debug)]
 | 
			
		||||
pub struct IndexStats {
 | 
			
		||||
    /// Number of documents in the index.
 | 
			
		||||
    pub number_of_documents: u64,
 | 
			
		||||
    /// Size taken up by the index' DB, in bytes.
 | 
			
		||||
    ///
 | 
			
		||||
    /// This includes the size taken by both the used and free pages of the DB, and as the free pages
 | 
			
		||||
    /// are not returned to the disk after a deletion, this number is typically larger than
 | 
			
		||||
    /// `used_database_size` that only includes the size of the used pages.
 | 
			
		||||
    pub database_size: u64,
 | 
			
		||||
    /// Size taken by the used pages of the index' DB, in bytes.
 | 
			
		||||
    ///
 | 
			
		||||
    /// As the DB backend does not return to the disk the pages that are not currently used by the DB,
 | 
			
		||||
    /// this value is typically smaller than `database_size`.
 | 
			
		||||
    pub used_database_size: u64,
 | 
			
		||||
    /// Association of every field name with the number of times it occurs in the documents.
 | 
			
		||||
    pub field_distribution: FieldDistribution,
 | 
			
		||||
    /// Creation date of the index.
 | 
			
		||||
    #[serde(with = "time::serde::rfc3339")]
 | 
			
		||||
    pub created_at: OffsetDateTime,
 | 
			
		||||
    /// Date of the last update of the index.
 | 
			
		||||
    #[serde(with = "time::serde::rfc3339")]
 | 
			
		||||
    pub updated_at: OffsetDateTime,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
impl IndexStats {
 | 
			
		||||
    /// Compute the stats of an index
 | 
			
		||||
    ///
 | 
			
		||||
    /// # Parameters
 | 
			
		||||
    ///
 | 
			
		||||
    /// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
 | 
			
		||||
    pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result<Self> {
 | 
			
		||||
        Ok(IndexStats {
 | 
			
		||||
            number_of_documents: index.number_of_documents(rtxn)?,
 | 
			
		||||
            database_size: index.on_disk_size()?,
 | 
			
		||||
            used_database_size: index.used_size()?,
 | 
			
		||||
            field_distribution: index.field_distribution(rtxn)?,
 | 
			
		||||
            created_at: index.created_at(rtxn)?,
 | 
			
		||||
            updated_at: index.updated_at(rtxn)?,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user