mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-25 04:56:28 +00:00 
			
		
		
		
	feat(http): stats route
This commit is contained in:
		| @@ -1,6 +1,3 @@ | ||||
| pub mod search; | ||||
| mod updates; | ||||
|  | ||||
| use std::ops::Deref; | ||||
| use std::sync::Arc; | ||||
|  | ||||
| @@ -11,6 +8,9 @@ use crate::index_controller::IndexController; | ||||
| use crate::index_controller::{IndexMetadata, IndexSettings}; | ||||
| use crate::option::Opt; | ||||
|  | ||||
| pub mod search; | ||||
| mod updates; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct Data { | ||||
|     inner: Arc<DataInner>, | ||||
|   | ||||
| @@ -1,6 +1,3 @@ | ||||
| mod search; | ||||
| mod updates; | ||||
|  | ||||
| use std::collections::{BTreeSet, HashSet}; | ||||
| use std::ops::Deref; | ||||
| use std::sync::Arc; | ||||
| @@ -8,10 +5,14 @@ use std::sync::Arc; | ||||
| use anyhow::{bail, Context}; | ||||
| use milli::obkv_to_json; | ||||
| use serde_json::{Map, Value}; | ||||
| use walkdir::WalkDir; | ||||
|  | ||||
| pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; | ||||
| pub use updates::{Facets, Settings, UpdateResult}; | ||||
|  | ||||
| mod search; | ||||
| mod updates; | ||||
|  | ||||
| pub type Document = Map<String, Value>; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| @@ -126,6 +127,15 @@ impl Index { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     pub fn size(&self) -> anyhow::Result<u64> { | ||||
|         Ok(WalkDir::new(self.env.path()) | ||||
|             .into_iter() | ||||
|             .filter_map(|entry| entry.ok()) | ||||
|             .filter_map(|entry| entry.metadata().ok()) | ||||
|             .filter(|metadata| metadata.is_file()) | ||||
|             .fold(0, |acc, m| acc + m.len())) | ||||
|     } | ||||
|  | ||||
|     fn fields_to_display<S: AsRef<str>>( | ||||
|         &self, | ||||
|         txn: &heed::RoTxn, | ||||
|   | ||||
| @@ -12,12 +12,15 @@ use tokio::sync::mpsc; | ||||
| use tokio::task::spawn_blocking; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; | ||||
| use crate::index::{Document, SearchQuery, SearchResult, Settings}; | ||||
| use crate::index_controller::update_handler::UpdateHandler; | ||||
| use crate::index_controller::{get_arc_ownership_blocking, updates::Processing, UpdateMeta}; | ||||
| use crate::index_controller::{ | ||||
|     get_arc_ownership_blocking, updates::Processing, IndexStats, UpdateMeta, | ||||
| }; | ||||
| use crate::option::IndexerOpts; | ||||
|  | ||||
| use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; | ||||
|  | ||||
| pub struct IndexActor<S> { | ||||
|     read_receiver: Option<mpsc::Receiver<IndexMsg>>, | ||||
|     write_receiver: Option<mpsc::Receiver<IndexMsg>>, | ||||
| @@ -146,6 +149,9 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> { | ||||
|             Snapshot { uuid, path, ret } => { | ||||
|                 let _ = ret.send(self.handle_snapshot(uuid, path).await); | ||||
|             } | ||||
|             GetStats { uuid, ret } => { | ||||
|                 let _ = ret.send(self.handle_get_stats(uuid).await); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
|  | ||||
| @@ -328,4 +334,25 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> { | ||||
|  | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
|     async fn handle_get_stats(&self, uuid: Uuid) -> Result<IndexStats> { | ||||
|         let index = self | ||||
|             .store | ||||
|             .get(uuid) | ||||
|             .await? | ||||
|             .ok_or(IndexError::UnexistingIndex)?; | ||||
|  | ||||
|         spawn_blocking(move || { | ||||
|             let rtxn = index.read_txn()?; | ||||
|  | ||||
|             Ok(IndexStats { | ||||
|                 size: index.size()?, | ||||
|                 number_of_documents: index.number_of_documents(&rtxn)?, | ||||
|                 is_indexing: false, // TODO check actual is_indexing | ||||
|                 fields_distribution: index.fields_distribution(&rtxn)?, | ||||
|             }) | ||||
|         }) | ||||
|         .await | ||||
|         .map_err(|e| IndexError::Error(e.into()))? | ||||
|     } | ||||
| } | ||||
|   | ||||
| @@ -3,12 +3,13 @@ use std::path::{Path, PathBuf}; | ||||
| use tokio::sync::{mpsc, oneshot}; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use crate::index::{Document, SearchQuery, SearchResult, Settings}; | ||||
| use crate::index_controller::{updates::Processing, UpdateMeta}; | ||||
| use crate::index_controller::{IndexSettings, IndexStats}; | ||||
|  | ||||
| use super::{ | ||||
|     IndexActor, IndexActorHandle, IndexMeta, IndexMsg, MapIndexStore, Result, UpdateResult, | ||||
| }; | ||||
| use crate::index::{Document, SearchQuery, SearchResult, Settings}; | ||||
| use crate::index_controller::IndexSettings; | ||||
| use crate::index_controller::{updates::Processing, UpdateMeta}; | ||||
|  | ||||
| #[derive(Clone)] | ||||
| pub struct IndexActorHandleImpl { | ||||
| @@ -121,6 +122,13 @@ impl IndexActorHandle for IndexActorHandleImpl { | ||||
|         let _ = self.read_sender.send(msg).await; | ||||
|         Ok(receiver.await.expect("IndexActor has been killed")?) | ||||
|     } | ||||
|  | ||||
|     async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats> { | ||||
|         let (ret, receiver) = oneshot::channel(); | ||||
|         let msg = IndexMsg::GetStats { uuid, ret }; | ||||
|         let _ = self.read_sender.send(msg).await; | ||||
|         Ok(receiver.await.expect("IndexActor has been killed")?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl IndexActorHandleImpl { | ||||
|   | ||||
| @@ -3,9 +3,10 @@ use std::path::PathBuf; | ||||
| use tokio::sync::oneshot; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use super::{IndexMeta, IndexSettings, Result, UpdateResult}; | ||||
| use crate::index::{Document, SearchQuery, SearchResult, Settings}; | ||||
| use crate::index_controller::{updates::Processing, UpdateMeta}; | ||||
| use crate::index_controller::{updates::Processing, IndexStats, UpdateMeta}; | ||||
|  | ||||
| use super::{IndexMeta, IndexSettings, Result, UpdateResult}; | ||||
|  | ||||
| pub enum IndexMsg { | ||||
|     CreateIndex { | ||||
| @@ -58,4 +59,8 @@ pub enum IndexMsg { | ||||
|         path: PathBuf, | ||||
|         ret: oneshot::Sender<Result<()>>, | ||||
|     }, | ||||
|     GetStats { | ||||
|         uuid: Uuid, | ||||
|         ret: oneshot::Sender<Result<IndexStats>>, | ||||
|     }, | ||||
| } | ||||
|   | ||||
| @@ -1,30 +1,30 @@ | ||||
| mod actor; | ||||
| mod handle_impl; | ||||
| mod message; | ||||
| mod store; | ||||
|  | ||||
| use std::path::PathBuf; | ||||
|  | ||||
| use chrono::{DateTime, Utc}; | ||||
| #[cfg(test)] | ||||
| use mockall::automock; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use thiserror::Error; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use super::IndexSettings; | ||||
| use actor::IndexActor; | ||||
| pub use handle_impl::IndexActorHandleImpl; | ||||
| use message::IndexMsg; | ||||
| use store::{IndexStore, MapIndexStore}; | ||||
|  | ||||
| use crate::index::UpdateResult as UResult; | ||||
| use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; | ||||
| use crate::index_controller::{ | ||||
|     updates::{Failed, Processed, Processing}, | ||||
|     UpdateMeta, | ||||
|     IndexStats, UpdateMeta, | ||||
| }; | ||||
| use actor::IndexActor; | ||||
| use message::IndexMsg; | ||||
| use store::{IndexStore, MapIndexStore}; | ||||
|  | ||||
| pub use handle_impl::IndexActorHandleImpl; | ||||
| use super::IndexSettings; | ||||
|  | ||||
| #[cfg(test)] | ||||
| use mockall::automock; | ||||
| mod actor; | ||||
| mod handle_impl; | ||||
| mod message; | ||||
| mod store; | ||||
|  | ||||
| pub type Result<T> = std::result::Result<T, IndexError>; | ||||
| type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>; | ||||
| @@ -33,7 +33,7 @@ type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<U | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct IndexMeta { | ||||
|     created_at: DateTime<Utc>, | ||||
|     updated_at: DateTime<Utc>, | ||||
|     pub updated_at: DateTime<Utc>, | ||||
|     primary_key: Option<String>, | ||||
| } | ||||
|  | ||||
| @@ -98,4 +98,5 @@ pub trait IndexActorHandle { | ||||
|     async fn get_index_meta(&self, uuid: Uuid) -> Result<IndexMeta>; | ||||
|     async fn update_index(&self, uuid: Uuid, index_settings: IndexSettings) -> Result<IndexMeta>; | ||||
|     async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; | ||||
|     async fn get_index_stats(&self, uuid: Uuid) -> Result<IndexStats>; | ||||
| } | ||||
|   | ||||
| @@ -1,10 +1,3 @@ | ||||
| mod index_actor; | ||||
| mod snapshot; | ||||
| mod update_actor; | ||||
| mod update_handler; | ||||
| mod updates; | ||||
| mod uuid_resolver; | ||||
|  | ||||
| use std::path::Path; | ||||
| use std::sync::Arc; | ||||
| use std::time::Duration; | ||||
| @@ -14,33 +7,40 @@ use anyhow::bail; | ||||
| use futures::stream::StreamExt; | ||||
| use log::info; | ||||
| use milli::update::{IndexDocumentsMethod, UpdateFormat}; | ||||
| use milli::FieldsDistribution; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use tokio::sync::mpsc; | ||||
| use tokio::time::sleep; | ||||
| use uuid::Uuid; | ||||
|  | ||||
| use index_actor::IndexActorHandle; | ||||
| use snapshot::load_snapshot; | ||||
| use snapshot::SnapshotService; | ||||
| use update_actor::UpdateActorHandle; | ||||
| pub use updates::{Failed, Processed, Processing}; | ||||
| use uuid_resolver::UuidError; | ||||
| use uuid_resolver::UuidResolverHandle; | ||||
|  | ||||
| use crate::index::{Document, SearchQuery, SearchResult}; | ||||
| use crate::index::{Facets, Settings, UpdateResult}; | ||||
| use crate::option::Opt; | ||||
|  | ||||
| use index_actor::IndexActorHandle; | ||||
| use snapshot::load_snapshot; | ||||
| use update_actor::UpdateActorHandle; | ||||
| use uuid_resolver::UuidResolverHandle; | ||||
|  | ||||
| use snapshot::SnapshotService; | ||||
| pub use updates::{Failed, Processed, Processing}; | ||||
| use uuid_resolver::UuidError; | ||||
| mod index_actor; | ||||
| mod snapshot; | ||||
| mod update_actor; | ||||
| mod update_handler; | ||||
| mod updates; | ||||
| mod uuid_resolver; | ||||
|  | ||||
| pub type UpdateStatus = updates::UpdateStatus<UpdateMeta, UpdateResult, String>; | ||||
|  | ||||
| #[derive(Debug, Serialize, Deserialize, Clone)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| pub struct IndexMetadata { | ||||
|     uid: String, | ||||
|     pub uid: String, | ||||
|     name: String, | ||||
|     #[serde(flatten)] | ||||
|     meta: index_actor::IndexMeta, | ||||
|     pub meta: index_actor::IndexMeta, | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Clone, Serialize, Deserialize)] | ||||
| @@ -63,6 +63,14 @@ pub struct IndexSettings { | ||||
|     pub primary_key: Option<String>, | ||||
| } | ||||
|  | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct IndexStats { | ||||
|     pub size: u64, | ||||
|     pub number_of_documents: u64, | ||||
|     pub is_indexing: bool, | ||||
|     pub fields_distribution: FieldsDistribution, | ||||
| } | ||||
|  | ||||
| pub struct IndexController { | ||||
|     uuid_resolver: uuid_resolver::UuidResolverHandleImpl, | ||||
|     index_handle: index_actor::IndexActorHandleImpl, | ||||
| @@ -100,10 +108,11 @@ impl IndexController { | ||||
|                 update_handle.clone(), | ||||
|                 Duration::from_secs(options.snapshot_interval_sec), | ||||
|                 options.snapshot_dir.clone(), | ||||
|                 options.db_path | ||||
|                 .file_name() | ||||
|                 .map(|n| n.to_owned().into_string().expect("invalid path")) | ||||
|                 .unwrap_or_else(|| String::from("data.ms")), | ||||
|                 options | ||||
|                     .db_path | ||||
|                     .file_name() | ||||
|                     .map(|n| n.to_owned().into_string().expect("invalid path")) | ||||
|                     .unwrap_or_else(|| String::from("data.ms")), | ||||
|             ); | ||||
|  | ||||
|             tokio::task::spawn(snapshot_service.run()); | ||||
| @@ -341,6 +350,12 @@ impl IndexController { | ||||
|         }; | ||||
|         Ok(meta) | ||||
|     } | ||||
|  | ||||
|     pub async fn get_stats(&self, uid: String) -> anyhow::Result<IndexStats> { | ||||
|         let uuid = self.uuid_resolver.get(uid.clone()).await?; | ||||
|  | ||||
|         Ok(self.index_handle.get_index_stats(uuid).await?) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T { | ||||
|   | ||||
| @@ -1,18 +1,20 @@ | ||||
| use std::collections::{BTreeMap, HashMap}; | ||||
| use std::collections::HashMap; | ||||
|  | ||||
| use actix_web::get; | ||||
| use actix_web::web; | ||||
| use actix_web::HttpResponse; | ||||
| use chrono::{DateTime, Utc}; | ||||
| use milli::FieldsDistribution; | ||||
| use serde::Serialize; | ||||
|  | ||||
| use crate::error::ResponseError; | ||||
| use crate::helpers::Authentication; | ||||
| use crate::index_controller::IndexStats; | ||||
| use crate::routes::IndexParam; | ||||
| use crate::Data; | ||||
|  | ||||
| pub fn services(cfg: &mut web::ServiceConfig) { | ||||
|     cfg.service(index_stats) | ||||
|     cfg.service(get_index_stats) | ||||
|         .service(get_stats) | ||||
|         .service(get_version); | ||||
| } | ||||
| @@ -22,28 +24,61 @@ pub fn services(cfg: &mut web::ServiceConfig) { | ||||
| struct IndexStatsResponse { | ||||
|     number_of_documents: u64, | ||||
|     is_indexing: bool, | ||||
|     fields_distribution: BTreeMap<String, usize>, | ||||
|     fields_distribution: FieldsDistribution, | ||||
| } | ||||
|  | ||||
| impl From<IndexStats> for IndexStatsResponse { | ||||
|     fn from(stats: IndexStats) -> Self { | ||||
|         Self { | ||||
|             number_of_documents: stats.number_of_documents, | ||||
|             is_indexing: stats.is_indexing, | ||||
|             fields_distribution: stats.fields_distribution, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| #[get("/indexes/{index_uid}/stats", wrap = "Authentication::Private")] | ||||
| async fn index_stats( | ||||
|     _data: web::Data<Data>, | ||||
|     _path: web::Path<IndexParam>, | ||||
| async fn get_index_stats( | ||||
|     data: web::Data<Data>, | ||||
|     path: web::Path<IndexParam>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     todo!() | ||||
|     let response: IndexStatsResponse = data | ||||
|         .index_controller | ||||
|         .get_stats(path.index_uid.clone()) | ||||
|         .await? | ||||
|         .into(); | ||||
|  | ||||
|     Ok(HttpResponse::Ok().json(response)) | ||||
| } | ||||
|  | ||||
| #[derive(Serialize)] | ||||
| #[serde(rename_all = "camelCase")] | ||||
| struct StatsResult { | ||||
| struct StatsResponse { | ||||
|     database_size: u64, | ||||
|     last_update: Option<DateTime<Utc>>, | ||||
|     indexes: HashMap<String, IndexStatsResponse>, | ||||
| } | ||||
|  | ||||
| #[get("/stats", wrap = "Authentication::Private")] | ||||
| async fn get_stats(_data: web::Data<Data>) -> Result<HttpResponse, ResponseError> { | ||||
|     todo!() | ||||
| async fn get_stats(data: web::Data<Data>) -> Result<HttpResponse, ResponseError> { | ||||
|     let mut response = StatsResponse { | ||||
|         database_size: 0, | ||||
|         last_update: None, | ||||
|         indexes: HashMap::new(), | ||||
|     }; | ||||
|  | ||||
|     for index in data.index_controller.list_indexes().await? { | ||||
|         let stats = data.index_controller.get_stats(index.uid.clone()).await?; | ||||
|  | ||||
|         response.database_size += stats.size; | ||||
|         response.last_update = Some(match response.last_update { | ||||
|             Some(last_update) => last_update.max(index.meta.updated_at), | ||||
|             None => index.meta.updated_at, | ||||
|         }); | ||||
|         response.indexes.insert(index.uid, stats.into()); | ||||
|     } | ||||
|  | ||||
|     Ok(HttpResponse::Ok().json(response)) | ||||
| } | ||||
|  | ||||
| #[derive(Serialize)] | ||||
| @@ -58,11 +93,11 @@ struct VersionResponse { | ||||
| async fn get_version() -> HttpResponse { | ||||
|     let commit_sha = match option_env!("COMMIT_SHA") { | ||||
|         Some("") | None => env!("VERGEN_SHA"), | ||||
|         Some(commit_sha) => commit_sha | ||||
|         Some(commit_sha) => commit_sha, | ||||
|     }; | ||||
|     let commit_date = match option_env!("COMMIT_DATE") { | ||||
|         Some("") | None => env!("VERGEN_COMMIT_DATE"), | ||||
|         Some(commit_date) => commit_date | ||||
|         Some(commit_date) => commit_date, | ||||
|     }; | ||||
|  | ||||
|     HttpResponse::Ok().json(VersionResponse { | ||||
|   | ||||
		Reference in New Issue
	
	Block a user