mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-20 05:20:36 +00:00
Compare commits
3 Commits
try-merge-
...
inspecting
Author | SHA1 | Date | |
---|---|---|---|
e644aa07a9 | |||
497d15685c | |||
9776136f92 |
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -2506,6 +2506,14 @@ dependencies = [
|
|||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "inspecting-allocator"
|
||||||
|
version = "1.8.0"
|
||||||
|
dependencies = [
|
||||||
|
"tracing",
|
||||||
|
"tracing-error",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "insta"
|
name = "insta"
|
||||||
version = "1.34.0"
|
version = "1.34.0"
|
||||||
@ -3316,6 +3324,7 @@ dependencies = [
|
|||||||
"http 0.2.11",
|
"http 0.2.11",
|
||||||
"index-scheduler",
|
"index-scheduler",
|
||||||
"indexmap",
|
"indexmap",
|
||||||
|
"inspecting-allocator",
|
||||||
"insta",
|
"insta",
|
||||||
"is-terminal",
|
"is-terminal",
|
||||||
"itertools 0.11.0",
|
"itertools 0.11.0",
|
||||||
@ -3365,6 +3374,7 @@ dependencies = [
|
|||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-actix-web",
|
"tracing-actix-web",
|
||||||
|
"tracing-error",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"tracing-trace",
|
"tracing-trace",
|
||||||
"url",
|
"url",
|
||||||
|
@ -18,7 +18,7 @@ members = [
|
|||||||
"fuzzers",
|
"fuzzers",
|
||||||
"tracing-trace",
|
"tracing-trace",
|
||||||
"xtask",
|
"xtask",
|
||||||
"build-info",
|
"build-info", "inspecting-allocator",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
15
inspecting-allocator/Cargo.toml
Normal file
15
inspecting-allocator/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[package]
|
||||||
|
name = "inspecting-allocator"
|
||||||
|
version.workspace = true
|
||||||
|
authors.workspace = true
|
||||||
|
description.workspace = true
|
||||||
|
homepage.workspace = true
|
||||||
|
readme.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tracing = "0.1.40"
|
||||||
|
tracing-error = { version = "0.2.0", default-features = false }
|
160
inspecting-allocator/src/lib.rs
Normal file
160
inspecting-allocator/src/lib.rs
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
use std::alloc::GlobalAlloc;
|
||||||
|
use std::cell::{Cell, RefCell};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
|
|
||||||
|
use tracing_error::SpanTrace;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct AllocEntry {
|
||||||
|
generation: u64,
|
||||||
|
span: SpanTrace,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AllocEntry {
|
||||||
|
pub fn generation(&self) -> u64 {
|
||||||
|
self.generation
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for AllocEntry {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
let mut res = Ok(());
|
||||||
|
let mut depth = 0;
|
||||||
|
self.span.with_spans(|metadata, fields| {
|
||||||
|
let name_with_module_name: Vec<&str> = metadata
|
||||||
|
.module_path()
|
||||||
|
.into_iter()
|
||||||
|
.chain(std::iter::once(metadata.name()))
|
||||||
|
.collect();
|
||||||
|
let name_with_module_name = name_with_module_name.join("::");
|
||||||
|
let location = format!(
|
||||||
|
"{}:{}",
|
||||||
|
metadata.file().unwrap_or_default(),
|
||||||
|
metadata.line().unwrap_or_default()
|
||||||
|
);
|
||||||
|
if let Err(error) =
|
||||||
|
writeln!(f, "[{depth}]{name_with_module_name}({fields}) at {location}")
|
||||||
|
{
|
||||||
|
res = Err(error);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
depth += 1;
|
||||||
|
true
|
||||||
|
});
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AllocatorState {
|
||||||
|
is_allocating: Cell<bool>,
|
||||||
|
state: RefCell<HashMap<*mut u8, AllocEntry>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static ALLOCATOR_STATE: AllocatorState = AllocatorState { is_allocating: Cell::new(false), state: RefCell::new(Default::default()) };
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct InspectingAllocator<InnerAllocator> {
|
||||||
|
inner: InnerAllocator,
|
||||||
|
current_generation: AtomicU64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AllocatorState {
|
||||||
|
fn handle_alloc(&self, allocated: *mut u8, current_generation: u64) -> *mut u8 {
|
||||||
|
if self.is_allocating.get() {
|
||||||
|
return allocated;
|
||||||
|
}
|
||||||
|
self.is_allocating.set(true);
|
||||||
|
{
|
||||||
|
self.state.borrow_mut().insert(
|
||||||
|
allocated,
|
||||||
|
AllocEntry { generation: current_generation, span: SpanTrace::capture() },
|
||||||
|
);
|
||||||
|
}
|
||||||
|
self.is_allocating.set(false);
|
||||||
|
|
||||||
|
allocated
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_dealloc(&self, allocated: *mut u8) {
|
||||||
|
if self.is_allocating.get() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
self.is_allocating.set(true);
|
||||||
|
{
|
||||||
|
self.state.borrow_mut().remove(&allocated);
|
||||||
|
}
|
||||||
|
self.is_allocating.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_older_generations(&self, older_generation: u64) -> Vec<(*mut u8, AllocEntry)> {
|
||||||
|
if self.is_allocating.get() {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
self.is_allocating.set(true);
|
||||||
|
let mut entries = Vec::new();
|
||||||
|
self.state.borrow_mut().retain(|k, v| {
|
||||||
|
if v.generation > older_generation {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
entries.push((*k, v.clone()));
|
||||||
|
false
|
||||||
|
});
|
||||||
|
self.is_allocating.set(false);
|
||||||
|
entries
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A> InspectingAllocator<A> {
|
||||||
|
pub const fn wrap(inner: A) -> Self {
|
||||||
|
Self { inner, current_generation: AtomicU64::new(0) }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn next_generation(&self) {
|
||||||
|
self.current_generation.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_older_generations(&self, older_than: u64) -> Vec<(*mut u8, AllocEntry)> {
|
||||||
|
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
if current_generation < older_than {
|
||||||
|
return Vec::new();
|
||||||
|
}
|
||||||
|
ALLOCATOR_STATE.with(|allocator_state| {
|
||||||
|
allocator_state.find_older_generations(current_generation - older_than)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl<InnerAllocator: GlobalAlloc> GlobalAlloc for InspectingAllocator<InnerAllocator> {
|
||||||
|
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
|
||||||
|
let allocated = self.inner.alloc(layout);
|
||||||
|
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
ALLOCATOR_STATE
|
||||||
|
.with(|allocator_state| allocator_state.handle_alloc(allocated, current_generation))
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
|
||||||
|
self.inner.dealloc(ptr, layout);
|
||||||
|
ALLOCATOR_STATE.with(|allocator_state| allocator_state.handle_dealloc(ptr))
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn alloc_zeroed(&self, layout: std::alloc::Layout) -> *mut u8 {
|
||||||
|
let allocated = self.inner.alloc_zeroed(layout);
|
||||||
|
|
||||||
|
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
ALLOCATOR_STATE
|
||||||
|
.with(|allocator_state| allocator_state.handle_alloc(allocated, current_generation))
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe fn realloc(&self, ptr: *mut u8, layout: std::alloc::Layout, new_size: usize) -> *mut u8 {
|
||||||
|
let reallocated = self.inner.realloc(ptr, layout, new_size);
|
||||||
|
if reallocated == ptr {
|
||||||
|
return reallocated;
|
||||||
|
}
|
||||||
|
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
ALLOCATOR_STATE.with(|allocator_state| allocator_state.handle_dealloc(ptr));
|
||||||
|
ALLOCATOR_STATE
|
||||||
|
.with(|allocator_state| allocator_state.handle_alloc(reallocated, current_generation))
|
||||||
|
}
|
||||||
|
}
|
@ -108,6 +108,8 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
|
|||||||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
||||||
tracing-actix-web = "0.7.9"
|
tracing-actix-web = "0.7.9"
|
||||||
build-info = { version = "1.7.0", path = "../build-info" }
|
build-info = { version = "1.7.0", path = "../build-info" }
|
||||||
|
inspecting-allocator = { version = "1.8.0", path = "../inspecting-allocator" }
|
||||||
|
tracing-error = { version = "0.2.0", default-features = false }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.9.0"
|
actix-rt = "2.9.0"
|
||||||
|
@ -16,15 +16,11 @@ use meilisearch::{
|
|||||||
LogStderrType, Opt, SubscriberForSecondLayer,
|
LogStderrType, Opt, SubscriberForSecondLayer,
|
||||||
};
|
};
|
||||||
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
|
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
|
||||||
use mimalloc::MiMalloc;
|
|
||||||
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
||||||
use tracing::level_filters::LevelFilter;
|
use tracing::level_filters::LevelFilter;
|
||||||
use tracing_subscriber::layer::SubscriberExt as _;
|
use tracing_subscriber::layer::SubscriberExt as _;
|
||||||
use tracing_subscriber::Layer;
|
use tracing_subscriber::Layer;
|
||||||
|
|
||||||
#[global_allocator]
|
|
||||||
static ALLOC: MiMalloc = MiMalloc;
|
|
||||||
|
|
||||||
fn default_log_route_layer() -> LogRouteType {
|
fn default_log_route_layer() -> LogRouteType {
|
||||||
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))
|
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))
|
||||||
}
|
}
|
||||||
@ -56,8 +52,10 @@ fn setup(opt: &Opt) -> anyhow::Result<(LogRouteHandle, LogStderrHandle)> {
|
|||||||
let (stderr_layer, stderr_layer_handle) =
|
let (stderr_layer, stderr_layer_handle) =
|
||||||
tracing_subscriber::reload::Layer::new(default_log_stderr_layer(opt));
|
tracing_subscriber::reload::Layer::new(default_log_stderr_layer(opt));
|
||||||
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
|
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
|
||||||
|
let error_layer = tracing_error::ErrorLayer::default();
|
||||||
|
|
||||||
let subscriber = tracing_subscriber::registry().with(route_layer).with(stderr_layer);
|
let subscriber =
|
||||||
|
tracing_subscriber::registry().with(route_layer).with(stderr_layer).with(error_layer);
|
||||||
|
|
||||||
// set the subscriber as the default for the application
|
// set the subscriber as the default for the application
|
||||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||||
|
@ -8,6 +8,7 @@ use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
|||||||
use deserr::Deserr;
|
use deserr::Deserr;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use index_scheduler::{IndexScheduler, TaskId};
|
use index_scheduler::{IndexScheduler, TaskId};
|
||||||
|
use inspecting_allocator::InspectingAllocator;
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||||
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
@ -20,6 +21,7 @@ use meilisearch_types::milli::DocumentId;
|
|||||||
use meilisearch_types::star_or::OptionStarOrList;
|
use meilisearch_types::star_or::OptionStarOrList;
|
||||||
use meilisearch_types::tasks::KindWithContent;
|
use meilisearch_types::tasks::KindWithContent;
|
||||||
use meilisearch_types::{milli, Document, Index};
|
use meilisearch_types::{milli, Document, Index};
|
||||||
|
use mimalloc::MiMalloc;
|
||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
@ -46,6 +48,9 @@ static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
|||||||
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
||||||
});
|
});
|
||||||
|
|
||||||
|
#[global_allocator]
|
||||||
|
static ALLOC: InspectingAllocator<MiMalloc> = InspectingAllocator::wrap(MiMalloc);
|
||||||
|
|
||||||
/// Extracts the mime type from the content type and return
|
/// Extracts the mime type from the content type and return
|
||||||
/// a meilisearch error if anything bad happen.
|
/// a meilisearch error if anything bad happen.
|
||||||
fn extract_mime_type(req: &HttpRequest) -> Result<Option<Mime>, MeilisearchHttpError> {
|
fn extract_mime_type(req: &HttpRequest) -> Result<Option<Mime>, MeilisearchHttpError> {
|
||||||
@ -468,6 +473,14 @@ async fn document_addition(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let scheduler = index_scheduler.clone();
|
let scheduler = index_scheduler.clone();
|
||||||
|
ALLOC.next_generation();
|
||||||
|
for (address, entry) in ALLOC.find_older_generations(5) {
|
||||||
|
println!(
|
||||||
|
"Found allocation older than 5 generations: {address:p} in generation {}. Span trace",
|
||||||
|
entry.generation()
|
||||||
|
);
|
||||||
|
println!("{entry}")
|
||||||
|
}
|
||||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
||||||
.await?
|
.await?
|
||||||
{
|
{
|
||||||
|
Reference in New Issue
Block a user