fix all the routes + move to a better version of mopa

This commit is contained in:
Tamo
2024-10-17 01:04:25 +02:00
parent aa7a34ffe8
commit e4ace98004
10 changed files with 65 additions and 75 deletions

View File

@@ -104,7 +104,7 @@ tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
tracing-actix-web = "0.7.11"
build-info = { version = "1.7.0", path = "../build-info" }
roaring = "0.10.2"
mopa = "0.2.2"
mopa-maintained = "0.2.3"
[dev-dependencies]
actix-rt = "2.10.0"

View File

@@ -1,3 +1,5 @@
#![allow(clippy::transmute_ptr_to_ref)] // mopify isn't updated with the latest version of clippy yet
pub mod segment_analytics;
use std::fs;

View File

@@ -69,21 +69,18 @@ impl Aggregate for PatchExperimentalFeatureAnalytics {
"Experimental features Updated"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
vector_store: other.vector_store,
metrics: other.metrics,
logs_route: other.logs_route,
edit_documents_by_function: other.edit_documents_by_function,
contains_filter: other.contains_filter,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}

View File

@@ -162,8 +162,8 @@ impl<Method: AggregateMethod> Aggregate for DocumentsFetchAggregator<Method> {
Method::event_name()
}
fn aggregate(self, other: Self) -> Self {
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
total_received: self.total_received.saturating_add(other.total_received),
per_document_id: self.per_document_id | other.per_document_id,
per_filter: self.per_filter | other.per_filter,
@@ -171,11 +171,11 @@ impl<Method: AggregateMethod> Aggregate for DocumentsFetchAggregator<Method> {
max_limit: self.max_limit.max(other.max_limit),
max_offset: self.max_offset.max(other.max_offset),
marker: PhantomData,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
@@ -226,21 +226,18 @@ impl Aggregate for DocumentsDeletionAggregator {
"Documents Deleted"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
total_received: self.total_received.saturating_add(other.total_received),
per_document_id: self.per_document_id | other.per_document_id,
clear_all: self.clear_all | other.clear_all,
per_batch: self.per_batch | other.per_batch,
per_filter: self.per_filter | other.per_filter,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
@@ -443,17 +440,17 @@ impl<Method: AggregateMethod> Aggregate for DocumentsAggregator<Method> {
Method::event_name()
}
fn aggregate(self, other: Self) -> Self {
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
payload_types: self.payload_types.union(&other.payload_types).cloned().collect(),
primary_key: self.primary_key.union(&other.primary_key).cloned().collect(),
index_creation: self.index_creation | other.index_creation,
method: PhantomData,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(self).unwrap_or_default()
}
}
@@ -811,19 +808,16 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
"Documents Edited By Function"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
filtered: self.filtered | other.filtered,
with_context: self.with_context | other.with_context,
index_creation: self.index_creation | other.index_creation,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}

View File

@@ -114,29 +114,29 @@ impl Aggregate for FacetSearchAggregator {
"Facet Searched POST"
}
fn aggregate(mut self, other: Self) -> Self {
fn aggregate(mut self: Box<Self>, other: Box<Self>) -> Box<Self> {
for time in other.time_spent {
self.time_spent.push(time);
}
Self {
Box::new(Self {
total_received: self.total_received.saturating_add(other.total_received),
total_succeeded: self.total_succeeded.saturating_add(other.total_succeeded),
time_spent: self.time_spent,
facet_names: self.facet_names.union(&other.facet_names).cloned().collect(),
additional_search_parameters_provided: self.additional_search_parameters_provided
| other.additional_search_parameters_provided,
}
})
}
fn into_event(self) -> impl Serialize {
fn into_event(self: Box<Self>) -> serde_json::Value {
let Self {
total_received,
total_succeeded,
time_spent,
facet_names,
additional_search_parameters_provided,
} = self;
} = *self;
// the index of the 99th percentage of value
let percentile_99th = 0.99 * (total_succeeded as f64 - 1.) + 1.;
// we get all the values in a sorted manner

View File

@@ -133,15 +133,14 @@ impl Aggregate for IndexCreatedAggregate {
"Index Created"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self { primary_key: self.primary_key.union(&other.primary_key).cloned().collect() }
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
primary_key: self.primary_key.union(&other.primary_key).cloned().collect(),
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
@@ -225,12 +224,14 @@ impl Aggregate for IndexUpdatedAggregate {
"Index Updated"
}
fn aggregate(self, other: Self) -> Self {
Self { primary_key: self.primary_key.union(&other.primary_key).cloned().collect() }
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
primary_key: self.primary_key.union(&other.primary_key).cloned().collect(),
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
pub async fn update_index(

View File

@@ -437,11 +437,8 @@ impl Aggregate for SettingsAnalytics {
"Settings Updated"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
ranking_rules: RankingRulesAnalytics {
words_position: self
.ranking_rules
@@ -586,14 +583,11 @@ impl Aggregate for SettingsAnalytics {
non_separator_tokens: NonSeparatorTokensAnalytics {
total: self.non_separator_tokens.total.or(other.non_separator_tokens.total),
},
}
})
}
fn into_event(self) -> impl Serialize
where
Self: Sized,
{
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}

View File

@@ -39,12 +39,14 @@ impl Aggregate for IndexSwappedAnalytics {
"Indexes Swapped"
}
fn aggregate(self, other: Self) -> Self {
Self { swap_operation_number: self.swap_operation_number.max(other.swap_operation_number) }
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
swap_operation_number: self.swap_operation_number.max(other.swap_operation_number),
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}

View File

@@ -185,8 +185,8 @@ impl<Method: AggregateMethod + 'static> Aggregate for TaskFilterAnalytics<Method
Method::event_name()
}
fn aggregate(self, other: Self) -> Self {
Self {
fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
Box::new(Self {
filtered_by_uid: self.filtered_by_uid | other.filtered_by_uid,
filtered_by_index_uid: self.filtered_by_index_uid | other.filtered_by_index_uid,
filtered_by_type: self.filtered_by_type | other.filtered_by_type,
@@ -206,11 +206,11 @@ impl<Method: AggregateMethod + 'static> Aggregate for TaskFilterAnalytics<Method
| other.filtered_by_after_finished_at,
marker: std::marker::PhantomData,
}
})
}
fn into_event(self) -> impl Serialize {
self
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}