mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-20 19:56:25 +00:00
Merge branch 'main' into fix-3037
This commit is contained in:
@ -5,8 +5,9 @@ use actix_web::HttpRequest;
|
||||
use meilisearch_types::InstanceUid;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::{find_user_id, Analytics};
|
||||
use super::{find_user_id, Analytics, DocumentDeletionKind};
|
||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||
use crate::routes::tasks::TasksFilterQueryRaw;
|
||||
use crate::Opt;
|
||||
|
||||
pub struct MockAnalytics {
|
||||
@ -49,6 +50,7 @@ impl Analytics for MockAnalytics {
|
||||
_request: &HttpRequest,
|
||||
) {
|
||||
}
|
||||
fn delete_documents(&self, _kind: DocumentDeletionKind, _request: &HttpRequest) {}
|
||||
fn update_documents(
|
||||
&self,
|
||||
_documents_query: &UpdateDocumentsQuery,
|
||||
@ -56,4 +58,6 @@ impl Analytics for MockAnalytics {
|
||||
_request: &HttpRequest,
|
||||
) {
|
||||
}
|
||||
fn get_tasks(&self, _query: &TasksFilterQueryRaw, _request: &HttpRequest) {}
|
||||
fn health_seen(&self, _request: &HttpRequest) {}
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ use platform_dirs::AppDirs;
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||
use crate::routes::tasks::TasksFilterQueryRaw;
|
||||
|
||||
// if we are in debug mode OR the analytics feature is disabled
|
||||
// the `SegmentAnalytics` point to the mock instead of the real analytics
|
||||
@ -54,6 +55,13 @@ fn find_user_id(db_path: &Path) -> Option<InstanceUid> {
|
||||
.and_then(|uid| InstanceUid::from_str(&uid).ok())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum DocumentDeletionKind {
|
||||
PerDocumentId,
|
||||
ClearAll,
|
||||
PerBatch,
|
||||
}
|
||||
|
||||
pub trait Analytics: Sync + Send {
|
||||
fn instance_uid(&self) -> Option<&InstanceUid>;
|
||||
|
||||
@ -73,6 +81,10 @@ pub trait Analytics: Sync + Send {
|
||||
index_creation: bool,
|
||||
request: &HttpRequest,
|
||||
);
|
||||
|
||||
// this method should be called to aggregate a add documents request
|
||||
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
|
||||
|
||||
// this method should be called to batch a update documents request
|
||||
fn update_documents(
|
||||
&self,
|
||||
@ -80,4 +92,10 @@ pub trait Analytics: Sync + Send {
|
||||
index_creation: bool,
|
||||
request: &HttpRequest,
|
||||
);
|
||||
|
||||
// this method should be called to aggregate the get tasks requests.
|
||||
fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest);
|
||||
|
||||
// this method should be called to aggregate a add documents request
|
||||
fn health_seen(&self, request: &HttpRequest);
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ use std::time::{Duration, Instant};
|
||||
|
||||
use actix_web::http::header::USER_AGENT;
|
||||
use actix_web::HttpRequest;
|
||||
use byte_unit::Byte;
|
||||
use http::header::CONTENT_TYPE;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_auth::SearchRules;
|
||||
@ -14,6 +15,7 @@ use once_cell::sync::Lazy;
|
||||
use regex::Regex;
|
||||
use segment::message::{Identify, Track, User};
|
||||
use segment::{AutoBatcher, Batcher, HttpClient};
|
||||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
use sysinfo::{DiskExt, System, SystemExt};
|
||||
use time::OffsetDateTime;
|
||||
@ -21,10 +23,11 @@ use tokio::select;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{config_user_id_path, MEILISEARCH_CONFIG_PATH};
|
||||
use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH};
|
||||
use crate::analytics::Analytics;
|
||||
use crate::option::default_http_addr;
|
||||
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, SchedulerConfig};
|
||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||
use crate::routes::tasks::TasksFilterQueryRaw;
|
||||
use crate::routes::{create_all_stats, Stats};
|
||||
use crate::search::{
|
||||
SearchQuery, SearchResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
|
||||
@ -66,7 +69,10 @@ pub enum AnalyticsMsg {
|
||||
AggregateGetSearch(SearchAggregator),
|
||||
AggregatePostSearch(SearchAggregator),
|
||||
AggregateAddDocuments(DocumentsAggregator),
|
||||
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
||||
AggregateUpdateDocuments(DocumentsAggregator),
|
||||
AggregateTasks(TasksAggregator),
|
||||
AggregateHealth(HealthAggregator),
|
||||
}
|
||||
|
||||
pub struct SegmentAnalytics {
|
||||
@ -125,7 +131,10 @@ impl SegmentAnalytics {
|
||||
post_search_aggregator: SearchAggregator::default(),
|
||||
get_search_aggregator: SearchAggregator::default(),
|
||||
add_documents_aggregator: DocumentsAggregator::default(),
|
||||
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
||||
update_documents_aggregator: DocumentsAggregator::default(),
|
||||
get_tasks_aggregator: TasksAggregator::default(),
|
||||
health_aggregator: HealthAggregator::default(),
|
||||
});
|
||||
tokio::spawn(segment.run(index_scheduler.clone()));
|
||||
|
||||
@ -171,6 +180,11 @@ impl super::Analytics for SegmentAnalytics {
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest) {
|
||||
let aggregate = DocumentsDeletionAggregator::from_query(kind, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateDeleteDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn update_documents(
|
||||
&self,
|
||||
documents_query: &UpdateDocumentsQuery,
|
||||
@ -180,6 +194,134 @@ impl super::Analytics for SegmentAnalytics {
|
||||
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn get_tasks(&self, query: &TasksFilterQueryRaw, request: &HttpRequest) {
|
||||
let aggregate = TasksAggregator::from_query(query, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
|
||||
}
|
||||
|
||||
fn health_seen(&self, request: &HttpRequest) {
|
||||
let aggregate = HealthAggregator::from_query(request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateHealth(aggregate));
|
||||
}
|
||||
}
|
||||
|
||||
/// This structure represent the `infos` field we send in the analytics.
|
||||
/// It's quite close to the `Opt` structure except all sensitive informations
|
||||
/// have been simplified to a boolean.
|
||||
/// It's send as-is in amplitude thus you should never update a name of the
|
||||
/// struct without the approval of the PM.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct Infos {
|
||||
env: String,
|
||||
db_path: bool,
|
||||
import_dump: bool,
|
||||
dumps_dir: bool,
|
||||
ignore_missing_dump: bool,
|
||||
ignore_dump_if_db_exists: bool,
|
||||
import_snapshot: bool,
|
||||
schedule_snapshot: bool,
|
||||
snapshot_dir: bool,
|
||||
snapshot_interval_sec: u64,
|
||||
ignore_missing_snapshot: bool,
|
||||
ignore_snapshot_if_db_exists: bool,
|
||||
http_addr: bool,
|
||||
max_index_size: Byte,
|
||||
max_task_db_size: Byte,
|
||||
http_payload_size_limit: Byte,
|
||||
disable_auto_batching: bool,
|
||||
log_level: String,
|
||||
max_indexing_memory: MaxMemory,
|
||||
max_indexing_threads: MaxThreads,
|
||||
with_configuration_file: bool,
|
||||
ssl_auth_path: bool,
|
||||
ssl_cert_path: bool,
|
||||
ssl_key_path: bool,
|
||||
ssl_ocsp_path: bool,
|
||||
ssl_require_auth: bool,
|
||||
ssl_resumption: bool,
|
||||
ssl_tickets: bool,
|
||||
}
|
||||
|
||||
impl From<Opt> for Infos {
|
||||
fn from(options: Opt) -> Self {
|
||||
// We wants to decompose this whole struct by hand to be sure we don't forget
|
||||
// to add analytics when we add a field in the Opt.
|
||||
// Thus we must not insert `..` at the end.
|
||||
let Opt {
|
||||
db_path,
|
||||
http_addr,
|
||||
master_key: _,
|
||||
env,
|
||||
max_index_size,
|
||||
max_task_db_size,
|
||||
http_payload_size_limit,
|
||||
ssl_cert_path,
|
||||
ssl_key_path,
|
||||
ssl_auth_path,
|
||||
ssl_ocsp_path,
|
||||
ssl_require_auth,
|
||||
ssl_resumption,
|
||||
ssl_tickets,
|
||||
import_snapshot,
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
snapshot_dir,
|
||||
schedule_snapshot,
|
||||
snapshot_interval_sec,
|
||||
import_dump,
|
||||
ignore_missing_dump,
|
||||
ignore_dump_if_db_exists,
|
||||
dumps_dir,
|
||||
log_level,
|
||||
indexer_options,
|
||||
scheduler_options,
|
||||
config_file_path,
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
no_analytics: _,
|
||||
} = options;
|
||||
|
||||
let SchedulerConfig { disable_auto_batching } = scheduler_options;
|
||||
let IndexerOpts {
|
||||
log_every_n: _,
|
||||
max_nb_chunks: _,
|
||||
max_indexing_memory,
|
||||
max_indexing_threads,
|
||||
} = indexer_options;
|
||||
|
||||
// We're going to override every sensible information.
|
||||
// We consider information sensible if it contains a path, an address, or a key.
|
||||
Self {
|
||||
env,
|
||||
db_path: db_path != PathBuf::from("./data.ms"),
|
||||
import_dump: import_dump.is_some(),
|
||||
dumps_dir: dumps_dir != PathBuf::from("dumps/"),
|
||||
ignore_missing_dump,
|
||||
ignore_dump_if_db_exists,
|
||||
import_snapshot: import_snapshot.is_some(),
|
||||
schedule_snapshot,
|
||||
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
|
||||
snapshot_interval_sec,
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
http_addr: http_addr != default_http_addr(),
|
||||
max_index_size,
|
||||
max_task_db_size,
|
||||
http_payload_size_limit,
|
||||
disable_auto_batching,
|
||||
log_level,
|
||||
max_indexing_memory,
|
||||
max_indexing_threads,
|
||||
with_configuration_file: config_file_path.is_some(),
|
||||
ssl_auth_path: ssl_auth_path.is_some(),
|
||||
ssl_cert_path: ssl_cert_path.is_some(),
|
||||
ssl_key_path: ssl_key_path.is_some(),
|
||||
ssl_ocsp_path: ssl_ocsp_path.is_some(),
|
||||
ssl_require_auth,
|
||||
ssl_resumption,
|
||||
ssl_tickets,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Segment {
|
||||
@ -190,7 +332,10 @@ pub struct Segment {
|
||||
get_search_aggregator: SearchAggregator,
|
||||
post_search_aggregator: SearchAggregator,
|
||||
add_documents_aggregator: DocumentsAggregator,
|
||||
delete_documents_aggregator: DocumentsDeletionAggregator,
|
||||
update_documents_aggregator: DocumentsAggregator,
|
||||
get_tasks_aggregator: TasksAggregator,
|
||||
health_aggregator: HealthAggregator,
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
@ -212,31 +357,6 @@ impl Segment {
|
||||
"server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(),
|
||||
})
|
||||
});
|
||||
// The infos are all cli option except every option containing sensitive information.
|
||||
// We consider an information as sensible if it contains a path, an address or a key.
|
||||
let infos = {
|
||||
// First we see if any sensitive fields were used.
|
||||
let db_path = opt.db_path != PathBuf::from("./data.ms");
|
||||
let import_dump = opt.import_dump.is_some();
|
||||
let dumps_dir = opt.dumps_dir != PathBuf::from("dumps/");
|
||||
let import_snapshot = opt.import_snapshot.is_some();
|
||||
let snapshots_dir = opt.snapshot_dir != PathBuf::from("snapshots/");
|
||||
let http_addr = opt.http_addr != default_http_addr();
|
||||
|
||||
let mut infos = serde_json::to_value(opt).unwrap();
|
||||
|
||||
// Then we overwrite all sensitive field with a boolean representing if
|
||||
// the feature was used or not.
|
||||
infos["db_path"] = json!(db_path);
|
||||
infos["import_dump"] = json!(import_dump);
|
||||
infos["dumps_dir"] = json!(dumps_dir);
|
||||
infos["import_snapshot"] = json!(import_snapshot);
|
||||
infos["snapshot_dir"] = json!(snapshots_dir);
|
||||
infos["http_addr"] = json!(http_addr);
|
||||
|
||||
infos
|
||||
};
|
||||
|
||||
let number_of_documents =
|
||||
stats.indexes.values().map(|index| index.number_of_documents).collect::<Vec<u64>>();
|
||||
|
||||
@ -248,7 +368,7 @@ impl Segment {
|
||||
"indexes_number": stats.indexes.len(),
|
||||
"documents_number": number_of_documents,
|
||||
},
|
||||
"infos": infos,
|
||||
"infos": Infos::from(opt.clone()),
|
||||
})
|
||||
}
|
||||
|
||||
@ -269,7 +389,10 @@ impl Segment {
|
||||
Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
|
||||
None => (),
|
||||
}
|
||||
}
|
||||
@ -299,8 +422,14 @@ impl Segment {
|
||||
.into_event(&self.user, "Documents Searched POST");
|
||||
let add_documents = std::mem::take(&mut self.add_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Added");
|
||||
let delete_documents = std::mem::take(&mut self.delete_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Deleted");
|
||||
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Updated");
|
||||
let get_tasks =
|
||||
std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen");
|
||||
let health =
|
||||
std::mem::take(&mut self.health_aggregator).into_event(&self.user, "Health Seen");
|
||||
|
||||
if let Some(get_search) = get_search {
|
||||
let _ = self.batcher.push(get_search).await;
|
||||
@ -311,9 +440,18 @@ impl Segment {
|
||||
if let Some(add_documents) = add_documents {
|
||||
let _ = self.batcher.push(add_documents).await;
|
||||
}
|
||||
if let Some(delete_documents) = delete_documents {
|
||||
let _ = self.batcher.push(delete_documents).await;
|
||||
}
|
||||
if let Some(update_documents) = update_documents {
|
||||
let _ = self.batcher.push(update_documents).await;
|
||||
}
|
||||
if let Some(get_tasks) = get_tasks {
|
||||
let _ = self.batcher.push(get_tasks).await;
|
||||
}
|
||||
if let Some(health) = health {
|
||||
let _ = self.batcher.push(health).await;
|
||||
}
|
||||
let _ = self.batcher.flush().await;
|
||||
}
|
||||
}
|
||||
@ -358,11 +496,18 @@ pub struct SearchAggregator {
|
||||
finite_pagination: usize,
|
||||
|
||||
// formatting
|
||||
max_attributes_to_retrieve: usize,
|
||||
max_attributes_to_highlight: usize,
|
||||
highlight_pre_tag: bool,
|
||||
highlight_post_tag: bool,
|
||||
max_attributes_to_crop: usize,
|
||||
crop_marker: bool,
|
||||
show_matches_position: bool,
|
||||
crop_length: bool,
|
||||
|
||||
// facets
|
||||
facets_sum_of_terms: usize,
|
||||
facets_total_number_of_facets: usize,
|
||||
}
|
||||
|
||||
impl SearchAggregator {
|
||||
@ -443,16 +588,19 @@ impl SearchAggregator {
|
||||
for user_agent in other.user_agents.into_iter() {
|
||||
self.user_agents.insert(user_agent);
|
||||
}
|
||||
|
||||
// request
|
||||
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||
self.total_succeeded = self.total_succeeded.saturating_add(other.total_succeeded);
|
||||
self.time_spent.append(&mut other.time_spent);
|
||||
|
||||
// sort
|
||||
self.sort_with_geo_point |= other.sort_with_geo_point;
|
||||
self.sort_sum_of_criteria_terms =
|
||||
self.sort_sum_of_criteria_terms.saturating_add(other.sort_sum_of_criteria_terms);
|
||||
self.sort_total_number_of_criteria =
|
||||
self.sort_total_number_of_criteria.saturating_add(other.sort_total_number_of_criteria);
|
||||
|
||||
// filter
|
||||
self.filter_with_geo_radius |= other.filter_with_geo_radius;
|
||||
self.filter_sum_of_criteria_terms =
|
||||
@ -467,20 +615,34 @@ impl SearchAggregator {
|
||||
// q
|
||||
self.max_terms_number = self.max_terms_number.max(other.max_terms_number);
|
||||
|
||||
for (key, value) in other.matching_strategy.into_iter() {
|
||||
let matching_strategy = self.matching_strategy.entry(key).or_insert(0);
|
||||
*matching_strategy = matching_strategy.saturating_add(value);
|
||||
}
|
||||
// pagination
|
||||
self.max_limit = self.max_limit.max(other.max_limit);
|
||||
self.max_offset = self.max_offset.max(other.max_offset);
|
||||
self.finite_pagination += other.finite_pagination;
|
||||
|
||||
// formatting
|
||||
self.max_attributes_to_retrieve =
|
||||
self.max_attributes_to_retrieve.max(other.max_attributes_to_retrieve);
|
||||
self.max_attributes_to_highlight =
|
||||
self.max_attributes_to_highlight.max(other.max_attributes_to_highlight);
|
||||
self.highlight_pre_tag |= other.highlight_pre_tag;
|
||||
self.highlight_post_tag |= other.highlight_post_tag;
|
||||
self.max_attributes_to_crop = self.max_attributes_to_crop.max(other.max_attributes_to_crop);
|
||||
self.crop_marker |= other.crop_marker;
|
||||
self.show_matches_position |= other.show_matches_position;
|
||||
self.crop_length |= other.crop_length;
|
||||
|
||||
// facets
|
||||
self.facets_sum_of_terms =
|
||||
self.facets_sum_of_terms.saturating_add(other.facets_sum_of_terms);
|
||||
self.facets_total_number_of_facets =
|
||||
self.facets_total_number_of_facets.saturating_add(other.facets_total_number_of_facets);
|
||||
|
||||
// matching strategy
|
||||
for (key, value) in other.matching_strategy.into_iter() {
|
||||
let matching_strategy = self.matching_strategy.entry(key).or_insert(0);
|
||||
*matching_strategy = matching_strategy.saturating_add(value);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||
@ -513,20 +675,28 @@ impl SearchAggregator {
|
||||
},
|
||||
"q": {
|
||||
"max_terms_number": self.max_terms_number,
|
||||
"most_used_matching_strategy": self.matching_strategy.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
|
||||
},
|
||||
"pagination": {
|
||||
"max_limit": self.max_limit,
|
||||
"max_offset": self.max_offset,
|
||||
"finite_pagination": self.finite_pagination > self.total_received / 2,
|
||||
"most_used_navigation": if self.finite_pagination > (self.total_received / 2) { "exhaustive" } else { "estimated" },
|
||||
},
|
||||
"formatting": {
|
||||
"max_attributes_to_retrieve": self.max_attributes_to_retrieve,
|
||||
"max_attributes_to_highlight": self.max_attributes_to_highlight,
|
||||
"highlight_pre_tag": self.highlight_pre_tag,
|
||||
"highlight_post_tag": self.highlight_post_tag,
|
||||
"max_attributes_to_crop": self.max_attributes_to_crop,
|
||||
"crop_marker": self.crop_marker,
|
||||
"show_matches_position": self.show_matches_position,
|
||||
"crop_length": self.crop_length,
|
||||
},
|
||||
"facets": {
|
||||
"avg_facets_number": format!("{:.2}", self.facets_sum_of_terms as f64 / self.facets_total_number_of_facets as f64),
|
||||
},
|
||||
"matching_strategy": {
|
||||
"most_used_strategy": self.matching_strategy.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
|
||||
}
|
||||
});
|
||||
|
||||
Some(Track {
|
||||
@ -622,3 +792,200 @@ impl DocumentsAggregator {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct DocumentsDeletionAggregator {
|
||||
#[serde(skip)]
|
||||
timestamp: Option<OffsetDateTime>,
|
||||
|
||||
// context
|
||||
#[serde(rename = "user-agent")]
|
||||
user_agents: HashSet<String>,
|
||||
|
||||
total_received: usize,
|
||||
per_document_id: bool,
|
||||
clear_all: bool,
|
||||
per_batch: bool,
|
||||
}
|
||||
|
||||
impl DocumentsDeletionAggregator {
|
||||
pub fn from_query(kind: DocumentDeletionKind, request: &HttpRequest) -> Self {
|
||||
let mut ret = Self::default();
|
||||
ret.timestamp = Some(OffsetDateTime::now_utc());
|
||||
|
||||
ret.user_agents = extract_user_agents(request).into_iter().collect();
|
||||
ret.total_received = 1;
|
||||
match kind {
|
||||
DocumentDeletionKind::PerDocumentId => ret.per_document_id = true,
|
||||
DocumentDeletionKind::ClearAll => ret.clear_all = true,
|
||||
DocumentDeletionKind::PerBatch => ret.per_batch = true,
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
/// Aggregate one [DocumentsAggregator] into another.
|
||||
pub fn aggregate(&mut self, other: Self) {
|
||||
if self.timestamp.is_none() {
|
||||
self.timestamp = other.timestamp;
|
||||
}
|
||||
|
||||
// we can't create a union because there is no `into_union` method
|
||||
for user_agent in other.user_agents {
|
||||
self.user_agents.insert(user_agent);
|
||||
}
|
||||
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||
self.per_document_id |= other.per_document_id;
|
||||
self.clear_all |= other.clear_all;
|
||||
self.per_batch |= other.per_batch;
|
||||
}
|
||||
|
||||
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||
// if we had no timestamp it means we never encountered any events and
|
||||
// thus we don't need to send this event.
|
||||
let timestamp = self.timestamp?;
|
||||
|
||||
Some(Track {
|
||||
timestamp: Some(timestamp),
|
||||
user: user.clone(),
|
||||
event: event_name.to_string(),
|
||||
properties: serde_json::to_value(self).ok()?,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct TasksAggregator {
|
||||
#[serde(skip)]
|
||||
timestamp: Option<OffsetDateTime>,
|
||||
|
||||
// context
|
||||
#[serde(rename = "user-agent")]
|
||||
user_agents: HashSet<String>,
|
||||
|
||||
filtered_by_uid: bool,
|
||||
filtered_by_index_uid: bool,
|
||||
filtered_by_type: bool,
|
||||
filtered_by_status: bool,
|
||||
filtered_by_canceled_by: bool,
|
||||
filtered_by_before_enqueued_at: bool,
|
||||
filtered_by_after_enqueued_at: bool,
|
||||
filtered_by_before_started_at: bool,
|
||||
filtered_by_after_started_at: bool,
|
||||
filtered_by_before_finished_at: bool,
|
||||
filtered_by_after_finished_at: bool,
|
||||
total_received: usize,
|
||||
}
|
||||
|
||||
impl TasksAggregator {
|
||||
pub fn from_query(query: &TasksFilterQueryRaw, request: &HttpRequest) -> Self {
|
||||
Self {
|
||||
timestamp: Some(OffsetDateTime::now_utc()),
|
||||
user_agents: extract_user_agents(request).into_iter().collect(),
|
||||
filtered_by_uid: query.common.uids.is_some(),
|
||||
filtered_by_index_uid: query.common.index_uids.is_some(),
|
||||
filtered_by_type: query.common.types.is_some(),
|
||||
filtered_by_status: query.common.statuses.is_some(),
|
||||
filtered_by_canceled_by: query.common.canceled_by.is_some(),
|
||||
filtered_by_before_enqueued_at: query.dates.before_enqueued_at.is_some(),
|
||||
filtered_by_after_enqueued_at: query.dates.after_enqueued_at.is_some(),
|
||||
filtered_by_before_started_at: query.dates.before_started_at.is_some(),
|
||||
filtered_by_after_started_at: query.dates.after_started_at.is_some(),
|
||||
filtered_by_before_finished_at: query.dates.before_finished_at.is_some(),
|
||||
filtered_by_after_finished_at: query.dates.after_finished_at.is_some(),
|
||||
total_received: 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Aggregate one [DocumentsAggregator] into another.
|
||||
pub fn aggregate(&mut self, other: Self) {
|
||||
if self.timestamp.is_none() {
|
||||
self.timestamp = other.timestamp;
|
||||
}
|
||||
|
||||
// we can't create a union because there is no `into_union` method
|
||||
for user_agent in other.user_agents {
|
||||
self.user_agents.insert(user_agent);
|
||||
}
|
||||
|
||||
self.filtered_by_uid |= other.filtered_by_uid;
|
||||
self.filtered_by_index_uid |= other.filtered_by_index_uid;
|
||||
self.filtered_by_type |= other.filtered_by_type;
|
||||
self.filtered_by_status |= other.filtered_by_status;
|
||||
self.filtered_by_canceled_by |= other.filtered_by_canceled_by;
|
||||
self.filtered_by_before_enqueued_at |= other.filtered_by_before_enqueued_at;
|
||||
self.filtered_by_after_enqueued_at |= other.filtered_by_after_enqueued_at;
|
||||
self.filtered_by_before_started_at |= other.filtered_by_before_started_at;
|
||||
self.filtered_by_after_started_at |= other.filtered_by_after_started_at;
|
||||
self.filtered_by_before_finished_at |= other.filtered_by_before_finished_at;
|
||||
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
|
||||
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
|
||||
|
||||
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||
}
|
||||
|
||||
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||
// if we had no timestamp it means we never encountered any events and
|
||||
// thus we don't need to send this event.
|
||||
let timestamp = self.timestamp?;
|
||||
|
||||
Some(Track {
|
||||
timestamp: Some(timestamp),
|
||||
user: user.clone(),
|
||||
event: event_name.to_string(),
|
||||
properties: serde_json::to_value(self).ok()?,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct HealthAggregator {
|
||||
#[serde(skip)]
|
||||
timestamp: Option<OffsetDateTime>,
|
||||
|
||||
// context
|
||||
#[serde(rename = "user-agent")]
|
||||
user_agents: HashSet<String>,
|
||||
|
||||
total_received: usize,
|
||||
}
|
||||
|
||||
impl HealthAggregator {
|
||||
pub fn from_query(request: &HttpRequest) -> Self {
|
||||
let mut ret = Self::default();
|
||||
ret.timestamp = Some(OffsetDateTime::now_utc());
|
||||
|
||||
ret.user_agents = extract_user_agents(request).into_iter().collect();
|
||||
ret.total_received = 1;
|
||||
ret
|
||||
}
|
||||
|
||||
/// Aggregate one [DocumentsAggregator] into another.
|
||||
pub fn aggregate(&mut self, other: Self) {
|
||||
if self.timestamp.is_none() {
|
||||
self.timestamp = other.timestamp;
|
||||
}
|
||||
|
||||
// we can't create a union because there is no `into_union` method
|
||||
for user_agent in other.user_agents {
|
||||
self.user_agents.insert(user_agent);
|
||||
}
|
||||
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||
}
|
||||
|
||||
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||
// if we had no timestamp it means we never encountered any events and
|
||||
// thus we don't need to send this event.
|
||||
let timestamp = self.timestamp?;
|
||||
|
||||
Some(Track {
|
||||
timestamp: Some(timestamp),
|
||||
user: user.clone(),
|
||||
event: event_name.to_string(),
|
||||
properties: serde_json::to_value(self).ok()?,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -31,11 +31,14 @@ impl<P, D> GuardedData<P, D> {
|
||||
where
|
||||
P: Policy + 'static,
|
||||
{
|
||||
let missing_master_key = auth.get_master_key().is_none();
|
||||
|
||||
match Self::authenticate(auth, token, index).await? {
|
||||
Some(filters) => match data {
|
||||
Some(data) => Ok(Self { data, filters, _marker: PhantomData }),
|
||||
None => Err(AuthenticationError::IrretrievableState.into()),
|
||||
},
|
||||
None if missing_master_key => Err(AuthenticationError::MissingMasterKey.into()),
|
||||
None => Err(AuthenticationError::InvalidToken.into()),
|
||||
}
|
||||
}
|
||||
|
@ -128,7 +128,13 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
autobatching_enabled: !opt.scheduler_options.disable_auto_batching,
|
||||
})
|
||||
};
|
||||
let meilisearch_builder = || -> anyhow::Result<_> {
|
||||
|
||||
enum OnFailure {
|
||||
RemoveDb,
|
||||
KeepDb,
|
||||
}
|
||||
|
||||
let meilisearch_builder = |on_failure: OnFailure| -> anyhow::Result<_> {
|
||||
// if anything wrong happens we delete the `data.ms` entirely.
|
||||
match (
|
||||
index_scheduler_builder().map_err(anyhow::Error::from),
|
||||
@ -137,7 +143,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
) {
|
||||
(Ok(i), Ok(a), Ok(())) => Ok((i, a)),
|
||||
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
if matches!(on_failure, OnFailure::RemoveDb) {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
@ -148,7 +156,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
let snapshot_path_exists = snapshot_path.exists();
|
||||
if empty_db && snapshot_path_exists {
|
||||
match compression::from_tar_gz(snapshot_path, &opt.db_path) {
|
||||
Ok(()) => meilisearch_builder()?,
|
||||
Ok(()) => meilisearch_builder(OnFailure::RemoveDb)?,
|
||||
Err(e) => {
|
||||
std::fs::remove_dir_all(&opt.db_path)?;
|
||||
return Err(e);
|
||||
@ -162,12 +170,13 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
} else if !snapshot_path_exists && !opt.ignore_missing_snapshot {
|
||||
bail!("snapshot doesn't exist at {}", snapshot_path.display())
|
||||
} else {
|
||||
meilisearch_builder()?
|
||||
meilisearch_builder(OnFailure::RemoveDb)?
|
||||
}
|
||||
} else if let Some(ref path) = opt.import_dump {
|
||||
let src_path_exists = path.exists();
|
||||
if empty_db && src_path_exists {
|
||||
let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?;
|
||||
let (mut index_scheduler, mut auth_controller) =
|
||||
meilisearch_builder(OnFailure::RemoveDb)?;
|
||||
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
|
||||
Ok(()) => (index_scheduler, auth_controller),
|
||||
Err(e) => {
|
||||
@ -183,7 +192,8 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
} else if !src_path_exists && !opt.ignore_missing_dump {
|
||||
bail!("dump doesn't exist at {:?}", path)
|
||||
} else {
|
||||
let (mut index_scheduler, mut auth_controller) = meilisearch_builder()?;
|
||||
let (mut index_scheduler, mut auth_controller) =
|
||||
meilisearch_builder(OnFailure::RemoveDb)?;
|
||||
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
|
||||
Ok(()) => (index_scheduler, auth_controller),
|
||||
Err(e) => {
|
||||
@ -196,7 +206,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
if !empty_db {
|
||||
check_version_file(&opt.db_path)?;
|
||||
}
|
||||
meilisearch_builder()?
|
||||
meilisearch_builder(OnFailure::KeepDb)?
|
||||
};
|
||||
|
||||
// We create a loop in a thread that registers snapshotCreation tasks
|
||||
@ -204,12 +214,15 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
|
||||
if opt.schedule_snapshot {
|
||||
let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec);
|
||||
let index_scheduler = index_scheduler.clone();
|
||||
thread::spawn(move || loop {
|
||||
thread::sleep(snapshot_delay);
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||
error!("Error while registering snapshot: {}", e);
|
||||
}
|
||||
});
|
||||
thread::Builder::new()
|
||||
.name(String::from("register-snapshot-tasks"))
|
||||
.spawn(move || loop {
|
||||
thread::sleep(snapshot_delay);
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||
error!("Error while registering snapshot: {}", e);
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
Ok((index_scheduler, auth_controller))
|
||||
|
@ -69,7 +69,7 @@ const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
||||
const DISABLE_AUTO_BATCHING: &str = "DISABLE_AUTO_BATCHING";
|
||||
const DEFAULT_LOG_EVERY_N: usize = 100000;
|
||||
|
||||
#[derive(Debug, Clone, Parser, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Parser, Deserialize)]
|
||||
#[clap(version, next_display_order = None)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields)]
|
||||
pub struct Opt {
|
||||
@ -84,7 +84,6 @@ pub struct Opt {
|
||||
pub http_addr: String,
|
||||
|
||||
/// Sets the instance's master key, automatically protecting all routes except `GET /health`.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, env = MEILI_MASTER_KEY)]
|
||||
pub master_key: Option<String>,
|
||||
|
||||
@ -99,7 +98,7 @@ pub struct Opt {
|
||||
/// All gathered data is used solely for the purpose of improving Meilisearch, and can be deleted
|
||||
/// at any time.
|
||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||
#[serde(skip_serializing, default)] // we can't send true
|
||||
#[serde(default)] // we can't send true
|
||||
#[clap(long, env = MEILI_NO_ANALYTICS)]
|
||||
pub no_analytics: bool,
|
||||
|
||||
@ -121,39 +120,35 @@ pub struct Opt {
|
||||
pub http_payload_size_limit: Byte,
|
||||
|
||||
/// Sets the server's SSL certificates.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, env = MEILI_SSL_CERT_PATH, value_parser)]
|
||||
pub ssl_cert_path: Option<PathBuf>,
|
||||
|
||||
/// Sets the server's SSL key files.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, env = MEILI_SSL_KEY_PATH, value_parser)]
|
||||
pub ssl_key_path: Option<PathBuf>,
|
||||
|
||||
/// Enables client authentication in the specified path.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, env = MEILI_SSL_AUTH_PATH, value_parser)]
|
||||
pub ssl_auth_path: Option<PathBuf>,
|
||||
|
||||
/// Sets the server's OCSP file. *Optional*
|
||||
///
|
||||
/// Reads DER-encoded OCSP response from OCSPFILE and staple to certificate.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, env = MEILI_SSL_OCSP_PATH, value_parser)]
|
||||
pub ssl_ocsp_path: Option<PathBuf>,
|
||||
|
||||
/// Makes SSL authentication mandatory.
|
||||
#[serde(skip_serializing, default)]
|
||||
#[serde(default)]
|
||||
#[clap(long, env = MEILI_SSL_REQUIRE_AUTH)]
|
||||
pub ssl_require_auth: bool,
|
||||
|
||||
/// Activates SSL session resumption.
|
||||
#[serde(skip_serializing, default)]
|
||||
#[serde(default)]
|
||||
#[clap(long, env = MEILI_SSL_RESUMPTION)]
|
||||
pub ssl_resumption: bool,
|
||||
|
||||
/// Activates SSL tickets.
|
||||
#[serde(skip_serializing, default)]
|
||||
#[serde(default)]
|
||||
#[clap(long, env = MEILI_SSL_TICKETS)]
|
||||
pub ssl_tickets: bool,
|
||||
|
||||
@ -251,7 +246,6 @@ pub struct Opt {
|
||||
|
||||
/// Set the path to a configuration file that should be used to setup the engine.
|
||||
/// Format must be TOML.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long)]
|
||||
pub config_file_path: Option<PathBuf>,
|
||||
}
|
||||
@ -439,16 +433,15 @@ impl Opt {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Parser, Deserialize, Serialize)]
|
||||
#[derive(Debug, Clone, Parser, Deserialize)]
|
||||
pub struct IndexerOpts {
|
||||
/// Sets the amount of documents to skip before printing
|
||||
/// a log regarding the indexing advancement.
|
||||
#[serde(skip_serializing, default = "default_log_every_n")]
|
||||
#[serde(default = "default_log_every_n")]
|
||||
#[clap(long, default_value_t = default_log_every_n(), hide = true)] // 100k
|
||||
pub log_every_n: usize,
|
||||
|
||||
/// Grenad max number of chunks in bytes.
|
||||
#[serde(skip_serializing)]
|
||||
#[clap(long, hide = true)]
|
||||
pub max_nb_chunks: Option<usize>,
|
||||
|
||||
@ -488,7 +481,7 @@ impl IndexerOpts {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Parser, Default, Deserialize, Serialize)]
|
||||
#[derive(Debug, Clone, Parser, Default, Deserialize)]
|
||||
#[serde(rename_all = "snake_case", deny_unknown_fields)]
|
||||
pub struct SchedulerConfig {
|
||||
/// Deactivates auto-batching when provided.
|
||||
@ -508,8 +501,10 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||
let thread_pool =
|
||||
rayon::ThreadPoolBuilder::new().num_threads(*other.max_indexing_threads).build()?;
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||
.num_threads(*other.max_indexing_threads)
|
||||
.build()?;
|
||||
|
||||
Ok(Self {
|
||||
log_every_n: Some(other.log_every_n),
|
||||
@ -580,7 +575,7 @@ fn total_memory_bytes() -> Option<u64> {
|
||||
let memory_kind = RefreshKind::new().with_memory();
|
||||
let mut system = System::new_with_specifics(memory_kind);
|
||||
system.refresh_memory();
|
||||
Some(system.total_memory() * 1024) // KiB into bytes
|
||||
Some(system.total_memory())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
|
@ -6,8 +6,6 @@ use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use serde_json::json;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
@ -27,16 +25,9 @@ pub async fn create_dump(
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
|
||||
|
||||
let dump_uid = OffsetDateTime::now_utc()
|
||||
.format(format_description!(
|
||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let task = KindWithContent::DumpCreation {
|
||||
keys: auth_controller.list_keys()?,
|
||||
instance_uid: analytics.instance_uid().cloned(),
|
||||
dump_uid,
|
||||
};
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
|
@ -20,7 +20,7 @@ use serde::Deserialize;
|
||||
use serde_cs::vec::CS;
|
||||
use serde_json::Value;
|
||||
use tempfile::NamedTempFile;
|
||||
use crate::analytics::Analytics;
|
||||
use crate::analytics::{Analytics, DocumentDeletionKind};
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::error::PayloadError::ReceivePayloadErr;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
@ -95,7 +95,11 @@ pub async fn get_document(
|
||||
pub async fn delete_document(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
path: web::Path<DocumentParam>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.delete_documents(DocumentDeletionKind::PerDocumentId, &req);
|
||||
|
||||
let DocumentParam { document_id, index_uid } = path.into_inner();
|
||||
let task = KindWithContent::DocumentDeletion { index_uid, documents_ids: vec![document_id] };
|
||||
let task: SummarizedTaskView =
|
||||
@ -321,8 +325,13 @@ pub async fn delete_documents(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
path: web::Path<String>,
|
||||
body: web::Json<Vec<Value>>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
debug!("called with params: {:?}", body);
|
||||
|
||||
analytics.delete_documents(DocumentDeletionKind::PerBatch, &req);
|
||||
|
||||
let ids = body
|
||||
.iter()
|
||||
.map(|v| v.as_str().map(String::from).unwrap_or_else(|| v.to_string()))
|
||||
@ -340,7 +349,11 @@ pub async fn delete_documents(
|
||||
pub async fn clear_all_documents(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||
path: web::Path<String>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||
|
||||
let task = KindWithContent::DocumentClear { index_uid: path.into_inner() };
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
|
@ -123,17 +123,6 @@ macro_rules! make_setting_route {
|
||||
}
|
||||
}
|
||||
};
|
||||
($route:literal, $update_verb:ident, $type:ty, $attr:ident, $camelcase_attr:literal) => {
|
||||
make_setting_route!(
|
||||
$route,
|
||||
$update_verb,
|
||||
$type,
|
||||
$attr,
|
||||
$camelcase_attr,
|
||||
_analytics,
|
||||
|_, _| {}
|
||||
);
|
||||
};
|
||||
}
|
||||
|
||||
make_setting_route!(
|
||||
@ -187,7 +176,22 @@ make_setting_route!(
|
||||
put,
|
||||
Vec<String>,
|
||||
displayed_attributes,
|
||||
"displayedAttributes"
|
||||
"displayedAttributes",
|
||||
analytics,
|
||||
|displayed: &Option<Vec<String>>, req: &HttpRequest| {
|
||||
use serde_json::json;
|
||||
|
||||
analytics.publish(
|
||||
"DisplayedAttributes Updated".to_string(),
|
||||
json!({
|
||||
"displayed_attributes": {
|
||||
"total": displayed.as_ref().map(|displayed| displayed.len()),
|
||||
"with_wildcard": displayed.as_ref().map(|displayed| displayed.iter().any(|displayed| displayed == "*")),
|
||||
},
|
||||
}),
|
||||
Some(req),
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
make_setting_route!(
|
||||
@ -247,6 +251,7 @@ make_setting_route!(
|
||||
json!({
|
||||
"searchable_attributes": {
|
||||
"total": setting.as_ref().map(|searchable| searchable.len()),
|
||||
"with_wildcard": setting.as_ref().map(|searchable| searchable.iter().any(|searchable| searchable == "*")),
|
||||
},
|
||||
}),
|
||||
Some(req),
|
||||
@ -259,7 +264,21 @@ make_setting_route!(
|
||||
put,
|
||||
std::collections::BTreeSet<String>,
|
||||
stop_words,
|
||||
"stopWords"
|
||||
"stopWords",
|
||||
analytics,
|
||||
|stop_words: &Option<std::collections::BTreeSet<String>>, req: &HttpRequest| {
|
||||
use serde_json::json;
|
||||
|
||||
analytics.publish(
|
||||
"StopWords Updated".to_string(),
|
||||
json!({
|
||||
"stop_words": {
|
||||
"total": stop_words.as_ref().map(|stop_words| stop_words.len()),
|
||||
},
|
||||
}),
|
||||
Some(req),
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
make_setting_route!(
|
||||
@ -267,10 +286,43 @@ make_setting_route!(
|
||||
put,
|
||||
std::collections::BTreeMap<String, Vec<String>>,
|
||||
synonyms,
|
||||
"synonyms"
|
||||
"synonyms",
|
||||
analytics,
|
||||
|synonyms: &Option<std::collections::BTreeMap<String, Vec<String>>>, req: &HttpRequest| {
|
||||
use serde_json::json;
|
||||
|
||||
analytics.publish(
|
||||
"Synonyms Updated".to_string(),
|
||||
json!({
|
||||
"synonyms": {
|
||||
"total": synonyms.as_ref().map(|synonyms| synonyms.len()),
|
||||
},
|
||||
}),
|
||||
Some(req),
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
make_setting_route!("/distinct-attribute", put, String, distinct_attribute, "distinctAttribute");
|
||||
make_setting_route!(
|
||||
"/distinct-attribute",
|
||||
put,
|
||||
String,
|
||||
distinct_attribute,
|
||||
"distinctAttribute",
|
||||
analytics,
|
||||
|distinct: &Option<String>, req: &HttpRequest| {
|
||||
use serde_json::json;
|
||||
analytics.publish(
|
||||
"DistinctAttribute Updated".to_string(),
|
||||
json!({
|
||||
"distinct_attribute": {
|
||||
"set": distinct.is_some(),
|
||||
}
|
||||
}),
|
||||
Some(req),
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
make_setting_route!(
|
||||
"/ranking-rules",
|
||||
@ -286,7 +338,13 @@ make_setting_route!(
|
||||
"RankingRules Updated".to_string(),
|
||||
json!({
|
||||
"ranking_rules": {
|
||||
"sort_position": setting.as_ref().map(|sort| sort.iter().position(|s| s == "sort")),
|
||||
"words_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "words")),
|
||||
"typo_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "typo")),
|
||||
"proximity_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "proximity")),
|
||||
"attribute_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "attribute")),
|
||||
"sort_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "sort")),
|
||||
"exactness_position": setting.as_ref().map(|rr| rr.iter().position(|s| s == "exactness")),
|
||||
"values": setting.as_ref().map(|rr| rr.iter().filter(|s| !s.contains(':')).cloned().collect::<Vec<_>>().join(", ")),
|
||||
}
|
||||
}),
|
||||
Some(req),
|
||||
@ -379,10 +437,21 @@ pub async fn update_all(
|
||||
"Settings Updated".to_string(),
|
||||
json!({
|
||||
"ranking_rules": {
|
||||
"sort_position": new_settings.ranking_rules.as_ref().set().map(|sort| sort.iter().position(|s| s == "sort")),
|
||||
"words_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "words")),
|
||||
"typo_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "typo")),
|
||||
"proximity_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "proximity")),
|
||||
"attribute_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "attribute")),
|
||||
"sort_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "sort")),
|
||||
"exactness_position": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().position(|s| s == "exactness")),
|
||||
"values": new_settings.ranking_rules.as_ref().set().map(|rr| rr.iter().filter(|s| !s.contains(':')).cloned().collect::<Vec<_>>().join(", ")),
|
||||
},
|
||||
"searchable_attributes": {
|
||||
"total": new_settings.searchable_attributes.as_ref().set().map(|searchable| searchable.len()),
|
||||
"with_wildcard": new_settings.searchable_attributes.as_ref().set().map(|searchable| searchable.iter().any(|searchable| searchable == "*")),
|
||||
},
|
||||
"displayed_attributes": {
|
||||
"total": new_settings.displayed_attributes.as_ref().set().map(|displayed| displayed.len()),
|
||||
"with_wildcard": new_settings.displayed_attributes.as_ref().set().map(|displayed| displayed.iter().any(|displayed| displayed == "*")),
|
||||
},
|
||||
"sortable_attributes": {
|
||||
"total": new_settings.sortable_attributes.as_ref().set().map(|sort| sort.len()),
|
||||
@ -392,6 +461,9 @@ pub async fn update_all(
|
||||
"total": new_settings.filterable_attributes.as_ref().set().map(|filter| filter.len()),
|
||||
"has_geo": new_settings.filterable_attributes.as_ref().set().map(|filter| filter.iter().any(|s| s == "_geo")),
|
||||
},
|
||||
"distinct_attribute": {
|
||||
"set": new_settings.distinct_attribute.as_ref().set().is_some()
|
||||
},
|
||||
"typo_tolerance": {
|
||||
"enabled": new_settings.typo_tolerance
|
||||
.as_ref()
|
||||
@ -435,6 +507,12 @@ pub async fn update_all(
|
||||
.set()
|
||||
.and_then(|s| s.max_total_hits.as_ref().set()),
|
||||
},
|
||||
"stop_words": {
|
||||
"total": new_settings.stop_words.as_ref().set().map(|stop_words| stop_words.len()),
|
||||
},
|
||||
"synonyms": {
|
||||
"total": new_settings.synonyms.as_ref().set().map(|synonyms| synonyms.len()),
|
||||
},
|
||||
}),
|
||||
Some(&req),
|
||||
);
|
||||
|
@ -21,7 +21,7 @@ mod api_key;
|
||||
mod dump;
|
||||
pub mod indexes;
|
||||
mod swap_indexes;
|
||||
mod tasks;
|
||||
pub mod tasks;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(web::scope("/tasks").configure(tasks::configure))
|
||||
@ -271,7 +271,7 @@ pub fn create_all_stats(
|
||||
let mut indexes = BTreeMap::new();
|
||||
let mut database_size = 0;
|
||||
let processing_task = index_scheduler.get_tasks_from_authorized_indexes(
|
||||
Query { status: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
||||
Query { statuses: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
||||
search_rules.authorized_indexes(),
|
||||
)?;
|
||||
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
||||
@ -308,7 +308,11 @@ struct VersionResponse {
|
||||
|
||||
async fn get_version(
|
||||
_index_scheduler: GuardedData<ActionPolicy<{ actions::VERSION }>, Data<IndexScheduler>>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> HttpResponse {
|
||||
analytics.publish("Version Seen".to_string(), json!(null), Some(&req));
|
||||
|
||||
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
|
||||
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
|
||||
|
||||
@ -325,6 +329,11 @@ struct KeysResponse {
|
||||
public: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn get_health() -> Result<HttpResponse, ResponseError> {
|
||||
pub async fn get_health(
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.health_seen(&req);
|
||||
|
||||
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "available" })))
|
||||
}
|
||||
|
@ -1,11 +1,13 @@
|
||||
use actix_web::web::Data;
|
||||
use actix_web::{web, HttpResponse};
|
||||
use actix_web::{web, HttpRequest, HttpResponse};
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
||||
use serde::Deserialize;
|
||||
use serde_json::json;
|
||||
|
||||
use super::SummarizedTaskView;
|
||||
use crate::analytics::Analytics;
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||
@ -23,7 +25,16 @@ pub struct SwapIndexesPayload {
|
||||
pub async fn swap_indexes(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
|
||||
params: web::Json<Vec<SwapIndexesPayload>>,
|
||||
req: HttpRequest,
|
||||
analytics: web::Data<dyn Analytics>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
analytics.publish(
|
||||
"Indexes Swapped".to_string(),
|
||||
json!({
|
||||
"swap_operation_number": params.len(),
|
||||
}),
|
||||
Some(&req),
|
||||
);
|
||||
let search_rules = &index_scheduler.filters().search_rules;
|
||||
|
||||
let mut swaps = vec![];
|
||||
|
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user