update the test with the stats

This commit is contained in:
Tamo
2025-01-20 15:15:45 +01:00
committed by Louis Dureuil
parent 0cc25c7e4c
commit cfc1e193b6
5 changed files with 56 additions and 17 deletions

View File

@@ -19,8 +19,18 @@ impl IndexScheduler {
indexes.len() as u32, indexes.len() as u32,
)); ));
let index = self.index(uid)?; let index = self.index(uid)?;
milli::update::upgrade::upgrade(&index, progress.clone()) let mut wtxn = index.write_txn()?;
let regenerate = milli::update::upgrade::upgrade(&mut wtxn, &index, progress.clone())
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
if regenerate {
let stats = crate::index_mapper::IndexStats::new(&index, &wtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
// Release wtxn as soon as possible because it stops us from registering tasks
let mut index_schd_wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut index_schd_wtxn, uid, &stats)?;
index_schd_wtxn.commit()?;
}
wtxn.commit()?;
} }
Ok(()) Ok(())

View File

@@ -206,6 +206,42 @@ async fn check_the_index_scheduler(server: &Server) {
let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await; let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16:47:41"); snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16:47:41");
let (stats, _) = server.stats().await;
snapshot!(stats, @r#"
{
"databaseSize": 425984,
"lastUpdate": "2025-01-16T17:18:43.296777845Z",
"indexes": {
"kefir": {
"numberOfDocuments": 1,
"isIndexing": false,
"fieldDistribution": {
"age": 1,
"description": 1,
"id": 1,
"name": 1,
"surname": 1
}
}
}
}
"#);
let index = server.index("kefir");
let (stats, _) = index.stats().await;
snapshot!(stats, @r#"
{
"numberOfDocuments": 1,
"isIndexing": false,
"fieldDistribution": {
"age": 1,
"description": 1,
"id": 1,
"name": 1,
"surname": 1
}
}
"#);
// Delete all the tasks of a specific batch // Delete all the tasks of a specific batch
let (task, _) = server.delete_tasks("batchUids=10").await; let (task, _) = server.delete_tasks("batchUids=10").await;
server.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
@@ -218,15 +254,6 @@ async fn check_the_index_scheduler(server: &Server) {
let index = server.index("kefirausaurus"); let index = server.index("kefirausaurus");
let (task, _) = index.create(Some("kefid")).await; let (task, _) = index.create(Some("kefid")).await;
server.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (stats, _) = index.stats().await;
snapshot!(stats, @r#"
{
"numberOfDocuments": 0,
"isIndexing": false,
"fieldDistribution": {}
}
"#);
} }
/// Ensuring the index roughly works with filter and sort. /// Ensuring the index roughly works with filter and sort.

View File

@@ -1,17 +1,19 @@
use heed::RwTxn;
use crate::progress::{Progress, VariableNameStep}; use crate::progress::{Progress, VariableNameStep};
use crate::{Index, InternalError, Result}; use crate::{Index, InternalError, Result};
pub fn upgrade(index: &Index, progress: Progress) -> Result<()> { /// Return true if the cached stats of the index must be regenerated
let wtxn = index.env.write_txn()?; pub fn upgrade(wtxn: &mut RwTxn, index: &Index, progress: Progress) -> Result<bool> {
let from = index.get_version(&wtxn)?; let from = index.get_version(wtxn)?;
let upgrade_functions = let upgrade_functions =
[(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")]; [(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")];
let start = match from { let (start, regenerate_stats) = match from {
// If there was no version it means we're coming from the v1.12 // If there was no version it means we're coming from the v1.12
None | Some((1, 12, _)) => 0, None | Some((1, 12, _)) => (0, false),
// We must handle the current version in the match because in case of a failure some index may have been upgraded but not other. // We must handle the current version in the match because in case of a failure some index may have been upgraded but not other.
Some((1, 13, _)) => return Ok(()), Some((1, 13, _)) => return Ok(false),
Some((major, minor, patch)) => { Some((major, minor, patch)) => {
return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into()) return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into())
} }
@@ -29,7 +31,7 @@ pub fn upgrade(index: &Index, progress: Progress) -> Result<()> {
(upgrade_function)(index, progress.clone())?; (upgrade_function)(index, progress.clone())?;
} }
Ok(()) Ok(regenerate_stats)
} }
fn v1_12_to_v1_13(_index: &Index, _progress: Progress) -> Result<()> { fn v1_12_to_v1_13(_index: &Index, _progress: Progress) -> Result<()> {