mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-19 13:00:46 +00:00
Compare commits
21 Commits
prototype-
...
inspecting
Author | SHA1 | Date | |
---|---|---|---|
e644aa07a9 | |||
497d15685c | |||
9776136f92 | |||
c668043c4f | |||
5a305bfdea | |||
f4dd73ec8c | |||
66dce4600d | |||
fe51ceca6d | |||
88174b8ae4 | |||
ebca29f3de | |||
c793b6ef6d | |||
cbbfff3594 | |||
dbcf50589b | |||
3e5cd027a5 | |||
7468c1cf8d | |||
d4aeff92d0 | |||
e87cb373de | |||
9b76501875 | |||
b3173d0423 | |||
96cc5319c8 | |||
0c7003c5df |
14
Cargo.lock
generated
14
Cargo.lock
generated
@ -889,9 +889,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "charabia"
|
||||
version = "0.8.9"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6a65052f308636e5d5e1777f0dbc07919f5fbac24b6c8ad3e140472e5520de9"
|
||||
checksum = "933f20f2269b24d32fd5503e7b3c268af902190daf8d9d2b73ed2e75d77c00b4"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"cow-utils",
|
||||
@ -2506,6 +2506,14 @@ dependencies = [
|
||||
"generic-array",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "inspecting-allocator"
|
||||
version = "1.8.0"
|
||||
dependencies = [
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "insta"
|
||||
version = "1.34.0"
|
||||
@ -3316,6 +3324,7 @@ dependencies = [
|
||||
"http 0.2.11",
|
||||
"index-scheduler",
|
||||
"indexmap",
|
||||
"inspecting-allocator",
|
||||
"insta",
|
||||
"is-terminal",
|
||||
"itertools 0.11.0",
|
||||
@ -3365,6 +3374,7 @@ dependencies = [
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-actix-web",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
"tracing-trace",
|
||||
"url",
|
||||
|
@ -18,7 +18,7 @@ members = [
|
||||
"fuzzers",
|
||||
"tracing-trace",
|
||||
"xtask",
|
||||
"build-info",
|
||||
"build-info", "inspecting-allocator",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
@ -256,8 +256,8 @@ pub(crate) mod test {
|
||||
|
||||
pub fn create_test_settings() -> Settings<Checked> {
|
||||
let settings = Settings {
|
||||
displayed_attributes: Setting::Set(vec![S("race"), S("name")]),
|
||||
searchable_attributes: Setting::Set(vec![S("name"), S("race")]),
|
||||
displayed_attributes: Setting::Set(vec![S("race"), S("name")]).into(),
|
||||
searchable_attributes: Setting::Set(vec![S("name"), S("race")]).into(),
|
||||
filterable_attributes: Setting::Set(btreeset! { S("race"), S("age") }),
|
||||
sortable_attributes: Setting::Set(btreeset! { S("age") }),
|
||||
ranking_rules: Setting::NotSet,
|
||||
|
@ -315,8 +315,8 @@ impl From<v5::ResponseError> for v6::ResponseError {
|
||||
impl<T> From<v5::Settings<T>> for v6::Settings<v6::Unchecked> {
|
||||
fn from(settings: v5::Settings<T>) -> Self {
|
||||
v6::Settings {
|
||||
displayed_attributes: settings.displayed_attributes.into(),
|
||||
searchable_attributes: settings.searchable_attributes.into(),
|
||||
displayed_attributes: v6::Setting::from(settings.displayed_attributes).into(),
|
||||
searchable_attributes: v6::Setting::from(settings.searchable_attributes).into(),
|
||||
filterable_attributes: settings.filterable_attributes.into(),
|
||||
sortable_attributes: settings.sortable_attributes.into(),
|
||||
ranking_rules: {
|
||||
|
@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
|
||||
0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,]
|
||||
|
@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs
|
||||
[]
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
|
||||
0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: WildcardSetting(NotSet), searchable_attributes: WildcardSetting(NotSet), filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData<meilisearch_types::settings::Unchecked> }, is_deletion: false, allow_index_creation: true }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
15
inspecting-allocator/Cargo.toml
Normal file
15
inspecting-allocator/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "inspecting-allocator"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
description.workspace = true
|
||||
homepage.workspace = true
|
||||
readme.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
tracing = "0.1.40"
|
||||
tracing-error = { version = "0.2.0", default-features = false }
|
160
inspecting-allocator/src/lib.rs
Normal file
160
inspecting-allocator/src/lib.rs
Normal file
@ -0,0 +1,160 @@
|
||||
use std::alloc::GlobalAlloc;
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
use tracing_error::SpanTrace;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AllocEntry {
|
||||
generation: u64,
|
||||
span: SpanTrace,
|
||||
}
|
||||
|
||||
impl AllocEntry {
|
||||
pub fn generation(&self) -> u64 {
|
||||
self.generation
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for AllocEntry {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let mut res = Ok(());
|
||||
let mut depth = 0;
|
||||
self.span.with_spans(|metadata, fields| {
|
||||
let name_with_module_name: Vec<&str> = metadata
|
||||
.module_path()
|
||||
.into_iter()
|
||||
.chain(std::iter::once(metadata.name()))
|
||||
.collect();
|
||||
let name_with_module_name = name_with_module_name.join("::");
|
||||
let location = format!(
|
||||
"{}:{}",
|
||||
metadata.file().unwrap_or_default(),
|
||||
metadata.line().unwrap_or_default()
|
||||
);
|
||||
if let Err(error) =
|
||||
writeln!(f, "[{depth}]{name_with_module_name}({fields}) at {location}")
|
||||
{
|
||||
res = Err(error);
|
||||
return false;
|
||||
}
|
||||
depth += 1;
|
||||
true
|
||||
});
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
struct AllocatorState {
|
||||
is_allocating: Cell<bool>,
|
||||
state: RefCell<HashMap<*mut u8, AllocEntry>>,
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
static ALLOCATOR_STATE: AllocatorState = AllocatorState { is_allocating: Cell::new(false), state: RefCell::new(Default::default()) };
|
||||
}
|
||||
|
||||
pub struct InspectingAllocator<InnerAllocator> {
|
||||
inner: InnerAllocator,
|
||||
current_generation: AtomicU64,
|
||||
}
|
||||
|
||||
impl AllocatorState {
|
||||
fn handle_alloc(&self, allocated: *mut u8, current_generation: u64) -> *mut u8 {
|
||||
if self.is_allocating.get() {
|
||||
return allocated;
|
||||
}
|
||||
self.is_allocating.set(true);
|
||||
{
|
||||
self.state.borrow_mut().insert(
|
||||
allocated,
|
||||
AllocEntry { generation: current_generation, span: SpanTrace::capture() },
|
||||
);
|
||||
}
|
||||
self.is_allocating.set(false);
|
||||
|
||||
allocated
|
||||
}
|
||||
|
||||
fn handle_dealloc(&self, allocated: *mut u8) {
|
||||
if self.is_allocating.get() {
|
||||
return;
|
||||
}
|
||||
self.is_allocating.set(true);
|
||||
{
|
||||
self.state.borrow_mut().remove(&allocated);
|
||||
}
|
||||
self.is_allocating.set(false);
|
||||
}
|
||||
|
||||
fn find_older_generations(&self, older_generation: u64) -> Vec<(*mut u8, AllocEntry)> {
|
||||
if self.is_allocating.get() {
|
||||
return Vec::new();
|
||||
}
|
||||
self.is_allocating.set(true);
|
||||
let mut entries = Vec::new();
|
||||
self.state.borrow_mut().retain(|k, v| {
|
||||
if v.generation > older_generation {
|
||||
return true;
|
||||
}
|
||||
entries.push((*k, v.clone()));
|
||||
false
|
||||
});
|
||||
self.is_allocating.set(false);
|
||||
entries
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> InspectingAllocator<A> {
|
||||
pub const fn wrap(inner: A) -> Self {
|
||||
Self { inner, current_generation: AtomicU64::new(0) }
|
||||
}
|
||||
|
||||
pub fn next_generation(&self) {
|
||||
self.current_generation.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn find_older_generations(&self, older_than: u64) -> Vec<(*mut u8, AllocEntry)> {
|
||||
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if current_generation < older_than {
|
||||
return Vec::new();
|
||||
}
|
||||
ALLOCATOR_STATE.with(|allocator_state| {
|
||||
allocator_state.find_older_generations(current_generation - older_than)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl<InnerAllocator: GlobalAlloc> GlobalAlloc for InspectingAllocator<InnerAllocator> {
|
||||
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
|
||||
let allocated = self.inner.alloc(layout);
|
||||
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||
ALLOCATOR_STATE
|
||||
.with(|allocator_state| allocator_state.handle_alloc(allocated, current_generation))
|
||||
}
|
||||
|
||||
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
|
||||
self.inner.dealloc(ptr, layout);
|
||||
ALLOCATOR_STATE.with(|allocator_state| allocator_state.handle_dealloc(ptr))
|
||||
}
|
||||
|
||||
unsafe fn alloc_zeroed(&self, layout: std::alloc::Layout) -> *mut u8 {
|
||||
let allocated = self.inner.alloc_zeroed(layout);
|
||||
|
||||
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||
ALLOCATOR_STATE
|
||||
.with(|allocator_state| allocator_state.handle_alloc(allocated, current_generation))
|
||||
}
|
||||
|
||||
unsafe fn realloc(&self, ptr: *mut u8, layout: std::alloc::Layout, new_size: usize) -> *mut u8 {
|
||||
let reallocated = self.inner.realloc(ptr, layout, new_size);
|
||||
if reallocated == ptr {
|
||||
return reallocated;
|
||||
}
|
||||
let current_generation = self.current_generation.load(std::sync::atomic::Ordering::Relaxed);
|
||||
ALLOCATOR_STATE.with(|allocator_state| allocator_state.handle_dealloc(ptr));
|
||||
ALLOCATOR_STATE
|
||||
.with(|allocator_state| allocator_state.handle_alloc(reallocated, current_generation))
|
||||
}
|
||||
}
|
@ -57,3 +57,5 @@ greek = ["milli/greek"]
|
||||
khmer = ["milli/khmer"]
|
||||
# allow vietnamese specialized tokenization
|
||||
vietnamese = ["milli/vietnamese"]
|
||||
# force swedish character recomposition
|
||||
swedish-recomposition = ["milli/swedish-recomposition"]
|
||||
|
@ -3,7 +3,7 @@ use std::convert::Infallible;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::ControlFlow;
|
||||
use std::ops::{ControlFlow, Deref};
|
||||
use std::str::FromStr;
|
||||
|
||||
use deserr::{DeserializeError, Deserr, ErrorKind, MergeWithError, ValuePointerRef};
|
||||
@ -143,21 +143,13 @@ impl MergeWithError<milli::CriterionError> for DeserrJsonError<InvalidSettingsRa
|
||||
)]
|
||||
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
|
||||
pub struct Settings<T> {
|
||||
#[serde(
|
||||
default,
|
||||
serialize_with = "serialize_with_wildcard",
|
||||
skip_serializing_if = "Setting::is_not_set"
|
||||
)]
|
||||
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidSettingsDisplayedAttributes>)]
|
||||
pub displayed_attributes: Setting<Vec<String>>,
|
||||
pub displayed_attributes: WildcardSetting,
|
||||
|
||||
#[serde(
|
||||
default,
|
||||
serialize_with = "serialize_with_wildcard",
|
||||
skip_serializing_if = "Setting::is_not_set"
|
||||
)]
|
||||
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidSettingsSearchableAttributes>)]
|
||||
pub searchable_attributes: Setting<Vec<String>>,
|
||||
pub searchable_attributes: WildcardSetting,
|
||||
|
||||
#[serde(default, skip_serializing_if = "Setting::is_not_set")]
|
||||
#[deserr(default, error = DeserrJsonError<InvalidSettingsFilterableAttributes>)]
|
||||
@ -251,8 +243,8 @@ impl<T> Settings<T> {
|
||||
impl Settings<Checked> {
|
||||
pub fn cleared() -> Settings<Checked> {
|
||||
Settings {
|
||||
displayed_attributes: Setting::Reset,
|
||||
searchable_attributes: Setting::Reset,
|
||||
displayed_attributes: Setting::Reset.into(),
|
||||
searchable_attributes: Setting::Reset.into(),
|
||||
filterable_attributes: Setting::Reset,
|
||||
sortable_attributes: Setting::Reset,
|
||||
ranking_rules: Setting::Reset,
|
||||
@ -319,7 +311,7 @@ impl Settings<Checked> {
|
||||
|
||||
impl Settings<Unchecked> {
|
||||
pub fn check(self) -> Settings<Checked> {
|
||||
let displayed_attributes = match self.displayed_attributes {
|
||||
let displayed_attributes = match self.displayed_attributes.0 {
|
||||
Setting::Set(fields) => {
|
||||
if fields.iter().any(|f| f == "*") {
|
||||
Setting::Reset
|
||||
@ -330,7 +322,7 @@ impl Settings<Unchecked> {
|
||||
otherwise => otherwise,
|
||||
};
|
||||
|
||||
let searchable_attributes = match self.searchable_attributes {
|
||||
let searchable_attributes = match self.searchable_attributes.0 {
|
||||
Setting::Set(fields) => {
|
||||
if fields.iter().any(|f| f == "*") {
|
||||
Setting::Reset
|
||||
@ -342,8 +334,8 @@ impl Settings<Unchecked> {
|
||||
};
|
||||
|
||||
Settings {
|
||||
displayed_attributes,
|
||||
searchable_attributes,
|
||||
displayed_attributes: displayed_attributes.into(),
|
||||
searchable_attributes: searchable_attributes.into(),
|
||||
filterable_attributes: self.filterable_attributes,
|
||||
sortable_attributes: self.sortable_attributes,
|
||||
ranking_rules: self.ranking_rules,
|
||||
@ -412,13 +404,13 @@ pub fn apply_settings_to_builder(
|
||||
_kind,
|
||||
} = settings;
|
||||
|
||||
match searchable_attributes {
|
||||
match searchable_attributes.deref() {
|
||||
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
|
||||
Setting::Reset => builder.reset_searchable_fields(),
|
||||
Setting::NotSet => (),
|
||||
}
|
||||
|
||||
match displayed_attributes {
|
||||
match displayed_attributes.deref() {
|
||||
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
|
||||
Setting::Reset => builder.reset_displayed_fields(),
|
||||
Setting::NotSet => (),
|
||||
@ -690,11 +682,13 @@ pub fn settings(
|
||||
displayed_attributes: match displayed_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
}
|
||||
.into(),
|
||||
searchable_attributes: match searchable_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
}
|
||||
.into(),
|
||||
filterable_attributes: Setting::Set(filterable_attributes),
|
||||
sortable_attributes: Setting::Set(sortable_attributes),
|
||||
ranking_rules: Setting::Set(criteria.iter().map(|c| c.clone().into()).collect()),
|
||||
@ -848,6 +842,41 @@ impl From<ProximityPrecisionView> for ProximityPrecision {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Deserialize, PartialEq, Eq)]
|
||||
pub struct WildcardSetting(Setting<Vec<String>>);
|
||||
|
||||
impl From<Setting<Vec<String>>> for WildcardSetting {
|
||||
fn from(setting: Setting<Vec<String>>) -> Self {
|
||||
Self(setting)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for WildcardSetting {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serialize_with_wildcard(&self.0, serializer)
|
||||
}
|
||||
}
|
||||
|
||||
impl<E: deserr::DeserializeError> Deserr<E> for WildcardSetting {
|
||||
fn deserialize_from_value<V: deserr::IntoValue>(
|
||||
value: deserr::Value<V>,
|
||||
location: ValuePointerRef<'_>,
|
||||
) -> Result<Self, E> {
|
||||
Ok(Self(Setting::deserialize_from_value(value, location)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl std::ops::Deref for WildcardSetting {
|
||||
type Target = Setting<Vec<String>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
use super::*;
|
||||
@ -856,8 +885,8 @@ pub(crate) mod test {
|
||||
fn test_setting_check() {
|
||||
// test no changes
|
||||
let settings = Settings {
|
||||
displayed_attributes: Setting::Set(vec![String::from("hello")]),
|
||||
searchable_attributes: Setting::Set(vec![String::from("hello")]),
|
||||
displayed_attributes: Setting::Set(vec![String::from("hello")]).into(),
|
||||
searchable_attributes: Setting::Set(vec![String::from("hello")]).into(),
|
||||
filterable_attributes: Setting::NotSet,
|
||||
sortable_attributes: Setting::NotSet,
|
||||
ranking_rules: Setting::NotSet,
|
||||
@ -883,8 +912,9 @@ pub(crate) mod test {
|
||||
// test wildcard
|
||||
// test no changes
|
||||
let settings = Settings {
|
||||
displayed_attributes: Setting::Set(vec![String::from("*")]),
|
||||
searchable_attributes: Setting::Set(vec![String::from("hello"), String::from("*")]),
|
||||
displayed_attributes: Setting::Set(vec![String::from("*")]).into(),
|
||||
searchable_attributes: Setting::Set(vec![String::from("hello"), String::from("*")])
|
||||
.into(),
|
||||
filterable_attributes: Setting::NotSet,
|
||||
sortable_attributes: Setting::NotSet,
|
||||
ranking_rules: Setting::NotSet,
|
||||
@ -904,7 +934,7 @@ pub(crate) mod test {
|
||||
};
|
||||
|
||||
let checked = settings.check();
|
||||
assert_eq!(checked.displayed_attributes, Setting::Reset);
|
||||
assert_eq!(checked.searchable_attributes, Setting::Reset);
|
||||
assert_eq!(checked.displayed_attributes, Setting::Reset.into());
|
||||
assert_eq!(checked.searchable_attributes, Setting::Reset.into());
|
||||
}
|
||||
}
|
||||
|
@ -108,6 +108,8 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
|
||||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
||||
tracing-actix-web = "0.7.9"
|
||||
build-info = { version = "1.7.0", path = "../build-info" }
|
||||
inspecting-allocator = { version = "1.8.0", path = "../inspecting-allocator" }
|
||||
tracing-error = { version = "0.2.0", default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.9.0"
|
||||
@ -156,6 +158,7 @@ thai = ["meilisearch-types/thai"]
|
||||
greek = ["meilisearch-types/greek"]
|
||||
khmer = ["meilisearch-types/khmer"]
|
||||
vietnamese = ["meilisearch-types/vietnamese"]
|
||||
swedish-recomposition = ["meilisearch-types/swedish-recomposition"]
|
||||
|
||||
[package.metadata.mini-dashboard]
|
||||
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.13/build.zip"
|
||||
|
@ -16,15 +16,11 @@ use meilisearch::{
|
||||
LogStderrType, Opt, SubscriberForSecondLayer,
|
||||
};
|
||||
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
|
||||
use mimalloc::MiMalloc;
|
||||
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
|
||||
use tracing::level_filters::LevelFilter;
|
||||
use tracing_subscriber::layer::SubscriberExt as _;
|
||||
use tracing_subscriber::Layer;
|
||||
|
||||
#[global_allocator]
|
||||
static ALLOC: MiMalloc = MiMalloc;
|
||||
|
||||
fn default_log_route_layer() -> LogRouteType {
|
||||
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))
|
||||
}
|
||||
@ -56,8 +52,10 @@ fn setup(opt: &Opt) -> anyhow::Result<(LogRouteHandle, LogStderrHandle)> {
|
||||
let (stderr_layer, stderr_layer_handle) =
|
||||
tracing_subscriber::reload::Layer::new(default_log_stderr_layer(opt));
|
||||
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
|
||||
let error_layer = tracing_error::ErrorLayer::default();
|
||||
|
||||
let subscriber = tracing_subscriber::registry().with(route_layer).with(stderr_layer);
|
||||
let subscriber =
|
||||
tracing_subscriber::registry().with(route_layer).with(stderr_layer).with(error_layer);
|
||||
|
||||
// set the subscriber as the default for the application
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
@ -13,6 +13,7 @@ use byte_unit::{Byte, ByteError};
|
||||
use clap::Parser;
|
||||
use meilisearch_types::features::InstanceTogglableFeatures;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::ThreadPoolNoAbortBuilder;
|
||||
use rustls::server::{
|
||||
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient, ServerSessionMemoryCache,
|
||||
};
|
||||
@ -666,7 +667,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
let thread_pool = ThreadPoolNoAbortBuilder::new()
|
||||
.thread_name(|index| format!("indexing-thread:{index}"))
|
||||
.num_threads(*other.max_indexing_threads)
|
||||
.build()?;
|
||||
|
@ -8,6 +8,7 @@ use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||
use deserr::Deserr;
|
||||
use futures::StreamExt;
|
||||
use index_scheduler::{IndexScheduler, TaskId};
|
||||
use inspecting_allocator::InspectingAllocator;
|
||||
use meilisearch_types::deserr::query_params::Param;
|
||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||
@ -20,6 +21,7 @@ use meilisearch_types::milli::DocumentId;
|
||||
use meilisearch_types::star_or::OptionStarOrList;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use meilisearch_types::{milli, Document, Index};
|
||||
use mimalloc::MiMalloc;
|
||||
use mime::Mime;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Deserialize;
|
||||
@ -46,6 +48,9 @@ static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
||||
});
|
||||
|
||||
#[global_allocator]
|
||||
static ALLOC: InspectingAllocator<MiMalloc> = InspectingAllocator::wrap(MiMalloc);
|
||||
|
||||
/// Extracts the mime type from the content type and return
|
||||
/// a meilisearch error if anything bad happen.
|
||||
fn extract_mime_type(req: &HttpRequest) -> Result<Option<Mime>, MeilisearchHttpError> {
|
||||
@ -468,6 +473,14 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let scheduler = index_scheduler.clone();
|
||||
ALLOC.next_generation();
|
||||
for (address, entry) in ALLOC.find_older_generations(5) {
|
||||
println!(
|
||||
"Found allocation older than 5 generations: {address:p} in generation {}. Span trace",
|
||||
entry.generation()
|
||||
);
|
||||
println!("{entry}")
|
||||
}
|
||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
||||
.await?
|
||||
{
|
||||
|
@ -137,10 +137,8 @@ macro_rules! make_setting_route {
|
||||
let settings = settings(&index, &rtxn, meilisearch_types::settings::SecretPolicy::HideSecrets)?;
|
||||
|
||||
debug!(returns = ?settings, "Update settings");
|
||||
let mut json = serde_json::json!(&settings);
|
||||
let val = json[$camelcase_attr].take();
|
||||
|
||||
Ok(HttpResponse::Ok().json(val))
|
||||
Ok(HttpResponse::Ok().json(settings.$attr))
|
||||
}
|
||||
|
||||
pub fn resources() -> Resource {
|
||||
|
@ -367,12 +367,6 @@ async fn get_version(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct KeysResponse {
|
||||
private: Option<String>,
|
||||
public: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn get_health(
|
||||
index_scheduler: Data<IndexScheduler>,
|
||||
auth_controller: Data<AuthController>,
|
||||
|
@ -17,7 +17,7 @@ bincode = "1.3.3"
|
||||
bstr = "1.9.0"
|
||||
bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.5.0"
|
||||
charabia = { version = "0.8.9", default-features = false }
|
||||
charabia = { version = "0.8.10", default-features = false }
|
||||
concat-arrays = "0.1.2"
|
||||
crossbeam-channel = "0.5.11"
|
||||
deserr = "0.6.1"
|
||||
@ -136,7 +136,11 @@ greek = ["charabia/greek"]
|
||||
# allow khmer specialized tokenization
|
||||
khmer = ["charabia/khmer"]
|
||||
|
||||
# allow vietnamese specialized tokenization
|
||||
vietnamese = ["charabia/vietnamese"]
|
||||
|
||||
# force swedish character recomposition
|
||||
swedish-recomposition = ["charabia/swedish-recomposition"]
|
||||
|
||||
# allow CUDA support, see <https://github.com/meilisearch/meilisearch/issues/4306>
|
||||
cuda = ["candle-core/cuda"]
|
||||
|
@ -9,6 +9,7 @@ use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::documents::{self, DocumentsBatchCursorError};
|
||||
use crate::thread_pool_no_abort::PanicCatched;
|
||||
use crate::{CriterionError, DocumentId, FieldId, Object, SortError};
|
||||
|
||||
pub fn is_reserved_keyword(keyword: &str) -> bool {
|
||||
@ -39,17 +40,19 @@ pub enum InternalError {
|
||||
Fst(#[from] fst::Error),
|
||||
#[error(transparent)]
|
||||
DocumentsError(#[from] documents::Error),
|
||||
#[error("Invalid compression type have been specified to grenad.")]
|
||||
#[error("Invalid compression type have been specified to grenad")]
|
||||
GrenadInvalidCompressionType,
|
||||
#[error("Invalid grenad file with an invalid version format.")]
|
||||
#[error("Invalid grenad file with an invalid version format")]
|
||||
GrenadInvalidFormatVersion,
|
||||
#[error("Invalid merge while processing {process}.")]
|
||||
#[error("Invalid merge while processing {process}")]
|
||||
IndexingMergingKeys { process: &'static str },
|
||||
#[error("{}", HeedError::InvalidDatabaseTyping)]
|
||||
InvalidDatabaseTyping,
|
||||
#[error(transparent)]
|
||||
RayonThreadPool(#[from] ThreadPoolBuildError),
|
||||
#[error(transparent)]
|
||||
PanicInThreadPool(#[from] PanicCatched),
|
||||
#[error(transparent)]
|
||||
SerdeJson(#[from] serde_json::Error),
|
||||
#[error(transparent)]
|
||||
Serialization(#[from] SerializationError),
|
||||
@ -57,9 +60,9 @@ pub enum InternalError {
|
||||
Store(#[from] MdbError),
|
||||
#[error(transparent)]
|
||||
Utf8(#[from] str::Utf8Error),
|
||||
#[error("An indexation process was explicitly aborted.")]
|
||||
#[error("An indexation process was explicitly aborted")]
|
||||
AbortedIndexation,
|
||||
#[error("The matching words list contains at least one invalid member.")]
|
||||
#[error("The matching words list contains at least one invalid member")]
|
||||
InvalidMatchingWords,
|
||||
#[error(transparent)]
|
||||
ArroyError(#[from] arroy::Error),
|
||||
|
@ -21,6 +21,7 @@ pub mod prompt;
|
||||
pub mod proximity;
|
||||
pub mod score_details;
|
||||
mod search;
|
||||
mod thread_pool_no_abort;
|
||||
pub mod update;
|
||||
pub mod vector;
|
||||
|
||||
@ -42,6 +43,7 @@ pub use search::new::{
|
||||
SearchLogger, VisualSearchLogger,
|
||||
};
|
||||
use serde_json::Value;
|
||||
pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
|
||||
pub use {charabia as tokenizer, heed};
|
||||
|
||||
pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError};
|
||||
|
69
milli/src/thread_pool_no_abort.rs
Normal file
69
milli/src/thread_pool_no_abort.rs
Normal file
@ -0,0 +1,69 @@
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
use thiserror::Error;
|
||||
|
||||
/// A rayon ThreadPool wrapper that can catch panics in the pool
|
||||
/// and modifies the install function accordingly.
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPoolNoAbort {
|
||||
thread_pool: ThreadPool,
|
||||
/// Set to true if the thread pool catched a panic.
|
||||
pool_catched_panic: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl ThreadPoolNoAbort {
|
||||
pub fn install<OP, R>(&self, op: OP) -> Result<R, PanicCatched>
|
||||
where
|
||||
OP: FnOnce() -> R + Send,
|
||||
R: Send,
|
||||
{
|
||||
let output = self.thread_pool.install(op);
|
||||
// While reseting the pool panic catcher we return an error if we catched one.
|
||||
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
|
||||
Err(PanicCatched)
|
||||
} else {
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn current_num_threads(&self) -> usize {
|
||||
self.thread_pool.current_num_threads()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[error("A panic occured. Read the logs to find more information about it")]
|
||||
pub struct PanicCatched;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct ThreadPoolNoAbortBuilder(ThreadPoolBuilder);
|
||||
|
||||
impl ThreadPoolNoAbortBuilder {
|
||||
pub fn new() -> ThreadPoolNoAbortBuilder {
|
||||
ThreadPoolNoAbortBuilder::default()
|
||||
}
|
||||
|
||||
pub fn thread_name<F>(mut self, closure: F) -> Self
|
||||
where
|
||||
F: FnMut(usize) -> String + 'static,
|
||||
{
|
||||
self.0 = self.0.thread_name(closure);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn num_threads(mut self, num_threads: usize) -> ThreadPoolNoAbortBuilder {
|
||||
self.0 = self.0.num_threads(num_threads);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(mut self) -> Result<ThreadPoolNoAbort, rayon::ThreadPoolBuildError> {
|
||||
let pool_catched_panic = Arc::new(AtomicBool::new(false));
|
||||
self.0 = self.0.panic_handler({
|
||||
let catched_panic = pool_catched_panic.clone();
|
||||
move |_result| catched_panic.store(true, Ordering::SeqCst)
|
||||
});
|
||||
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
|
||||
}
|
||||
}
|
@ -19,7 +19,7 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||
use crate::update::index_documents::helpers::try_split_at;
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::vector::Embedder;
|
||||
use crate::{DocumentId, InternalError, Result, VectorOrArrayOfVectors};
|
||||
use crate::{DocumentId, InternalError, Result, ThreadPoolNoAbort, VectorOrArrayOfVectors};
|
||||
|
||||
/// The length of the elements that are always in the buffer when inserting new values.
|
||||
const TRUNCATE_SIZE: usize = size_of::<DocumentId>();
|
||||
@ -362,7 +362,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
|
||||
prompt_reader: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
embedder: Arc<Embedder>,
|
||||
request_threads: &rayon::ThreadPool,
|
||||
request_threads: &ThreadPoolNoAbort,
|
||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
puffin::profile_function!();
|
||||
let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism
|
||||
|
@ -31,7 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
|
||||
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
||||
use super::{helpers, TypedChunk};
|
||||
use crate::update::settings::InnerIndexSettingsDiff;
|
||||
use crate::{FieldId, Result};
|
||||
use crate::{FieldId, Result, ThreadPoolNoAbortBuilder};
|
||||
|
||||
/// Extract data for each databases from obkv documents in parallel.
|
||||
/// Send data in grenad file over provided Sender.
|
||||
@ -229,7 +229,7 @@ fn send_original_documents_data(
|
||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||
|
||||
let request_threads = rayon::ThreadPoolBuilder::new()
|
||||
let request_threads = ThreadPoolNoAbortBuilder::new()
|
||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||
.thread_name(|index| format!("embedding-request-{index}"))
|
||||
.build()?;
|
||||
|
@ -33,6 +33,7 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
|
||||
pub use self::transform::{Transform, TransformOutput};
|
||||
use crate::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use crate::error::{Error, InternalError, UserError};
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
|
||||
pub use crate::update::index_documents::helpers::CursorClonableMmap;
|
||||
use crate::update::{
|
||||
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
|
||||
@ -298,18 +299,18 @@ where
|
||||
let backup_pool;
|
||||
let pool = match self.indexer_config.thread_pool {
|
||||
Some(ref pool) => pool,
|
||||
#[cfg(not(test))]
|
||||
None => {
|
||||
// We initialize a bakcup pool with the default
|
||||
// We initialize a backup pool with the default
|
||||
// settings if none have already been set.
|
||||
backup_pool = rayon::ThreadPoolBuilder::new().build()?;
|
||||
&backup_pool
|
||||
}
|
||||
#[cfg(test)]
|
||||
None => {
|
||||
// We initialize a bakcup pool with the default
|
||||
// settings if none have already been set.
|
||||
backup_pool = rayon::ThreadPoolBuilder::new().num_threads(1).build()?;
|
||||
#[allow(unused_mut)]
|
||||
let mut pool_builder = ThreadPoolNoAbortBuilder::new();
|
||||
|
||||
#[cfg(test)]
|
||||
{
|
||||
pool_builder = pool_builder.num_threads(1);
|
||||
}
|
||||
|
||||
backup_pool = pool_builder.build()?;
|
||||
&backup_pool
|
||||
}
|
||||
};
|
||||
@ -533,7 +534,7 @@ where
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
}).map_err(InternalError::from)??;
|
||||
|
||||
// We write the field distribution into the main database
|
||||
self.index.put_field_distribution(self.wtxn, &field_distribution)?;
|
||||
@ -562,7 +563,8 @@ where
|
||||
writer.build(wtxn, &mut rng, None)?;
|
||||
}
|
||||
Result::Ok(())
|
||||
})?;
|
||||
})
|
||||
.map_err(InternalError::from)??;
|
||||
}
|
||||
|
||||
self.execute_prefix_databases(
|
||||
|
@ -1,5 +1,6 @@
|
||||
use grenad::CompressionType;
|
||||
use rayon::ThreadPool;
|
||||
|
||||
use crate::thread_pool_no_abort::ThreadPoolNoAbort;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct IndexerConfig {
|
||||
@ -9,7 +10,7 @@ pub struct IndexerConfig {
|
||||
pub max_memory: Option<usize>,
|
||||
pub chunk_compression_type: CompressionType,
|
||||
pub chunk_compression_level: Option<u32>,
|
||||
pub thread_pool: Option<ThreadPool>,
|
||||
pub thread_pool: Option<ThreadPoolNoAbort>,
|
||||
pub max_positions_per_attributes: Option<u32>,
|
||||
pub skip_index_budget: bool,
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ use std::path::PathBuf;
|
||||
use hf_hub::api::sync::ApiError;
|
||||
|
||||
use crate::error::FaultSource;
|
||||
use crate::PanicCatched;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Error while generating embeddings: {inner}")]
|
||||
@ -80,6 +81,8 @@ pub enum EmbedErrorKind {
|
||||
OpenAiUnexpectedDimension(usize, usize),
|
||||
#[error("no embedding was produced")]
|
||||
MissingEmbedding,
|
||||
#[error(transparent)]
|
||||
PanicInThreadPool(#[from] PanicCatched),
|
||||
}
|
||||
|
||||
impl EmbedError {
|
||||
|
@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use self::error::{EmbedError, NewEmbedderError};
|
||||
use crate::prompt::{Prompt, PromptData};
|
||||
use crate::ThreadPoolNoAbort;
|
||||
|
||||
pub mod error;
|
||||
pub mod hf;
|
||||
@ -254,7 +255,7 @@ impl Embedder {
|
||||
pub fn embed_chunks(
|
||||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &rayon::ThreadPool,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> std::result::Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
|
||||
match self {
|
||||
Embedder::HuggingFace(embedder) => embedder.embed_chunks(text_chunks),
|
||||
|
@ -3,6 +3,8 @@ use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
|
||||
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::{DistributionShift, Embeddings};
|
||||
use crate::error::FaultSource;
|
||||
use crate::ThreadPoolNoAbort;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Embedder {
|
||||
@ -71,11 +73,16 @@ impl Embedder {
|
||||
pub fn embed_chunks(
|
||||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &rayon::ThreadPool,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
|
||||
threads.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
@ -4,7 +4,9 @@ use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
|
||||
use super::error::{EmbedError, NewEmbedderError};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::{DistributionShift, Embeddings};
|
||||
use crate::error::FaultSource;
|
||||
use crate::vector::error::EmbedErrorKind;
|
||||
use crate::ThreadPoolNoAbort;
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||
pub struct EmbedderOptions {
|
||||
@ -241,11 +243,16 @@ impl Embedder {
|
||||
pub fn embed_chunks(
|
||||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &rayon::ThreadPool,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
|
||||
threads.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
@ -2,9 +2,12 @@ use deserr::Deserr;
|
||||
use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::error::EmbedErrorKind;
|
||||
use super::{
|
||||
DistributionShift, EmbedError, Embedding, Embeddings, NewEmbedderError, REQUEST_PARALLELISM,
|
||||
};
|
||||
use crate::error::FaultSource;
|
||||
use crate::ThreadPoolNoAbort;
|
||||
|
||||
// retrying in case of failure
|
||||
|
||||
@ -158,11 +161,16 @@ impl Embedder {
|
||||
pub fn embed_chunks(
|
||||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &rayon::ThreadPool,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
|
||||
threads.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk)).collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
@ -301,10 +301,14 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
fn from(value: EmbeddingConfig) -> Self {
|
||||
let EmbeddingConfig { embedder_options, prompt } = value;
|
||||
match embedder_options {
|
||||
super::EmbedderOptions::HuggingFace(options) => Self {
|
||||
super::EmbedderOptions::HuggingFace(super::hf::EmbedderOptions {
|
||||
model,
|
||||
revision,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::HuggingFace),
|
||||
model: Setting::Set(options.model),
|
||||
revision: options.revision.map(Setting::Set).unwrap_or_default(),
|
||||
model: Setting::Set(model),
|
||||
revision: revision.map(Setting::Set).unwrap_or_default(),
|
||||
api_key: Setting::NotSet,
|
||||
dimensions: Setting::NotSet,
|
||||
document_template: Setting::Set(prompt.template),
|
||||
@ -314,14 +318,19 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::OpenAi(options) => Self {
|
||||
super::EmbedderOptions::OpenAi(super::openai::EmbedderOptions {
|
||||
api_key,
|
||||
embedding_model,
|
||||
dimensions,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::OpenAi),
|
||||
model: Setting::Set(options.embedding_model.name().to_owned()),
|
||||
model: Setting::Set(embedding_model.name().to_owned()),
|
||||
revision: Setting::NotSet,
|
||||
api_key: options.api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: options.dimensions.map(Setting::Set).unwrap_or_default(),
|
||||
api_key: api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: dimensions.map(Setting::Set).unwrap_or_default(),
|
||||
document_template: Setting::Set(prompt.template),
|
||||
url: Setting::NotSet,
|
||||
query: Setting::NotSet,
|
||||
@ -329,29 +338,37 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::Ollama(options) => Self {
|
||||
super::EmbedderOptions::Ollama(super::ollama::EmbedderOptions {
|
||||
embedding_model,
|
||||
url,
|
||||
api_key,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::Ollama),
|
||||
model: Setting::Set(options.embedding_model.to_owned()),
|
||||
model: Setting::Set(embedding_model),
|
||||
revision: Setting::NotSet,
|
||||
api_key: Setting::NotSet,
|
||||
api_key: api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: Setting::NotSet,
|
||||
document_template: Setting::Set(prompt.template),
|
||||
url: Setting::NotSet,
|
||||
url: url.map(Setting::Set).unwrap_or_default(),
|
||||
query: Setting::NotSet,
|
||||
input_field: Setting::NotSet,
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::UserProvided(options) => Self {
|
||||
super::EmbedderOptions::UserProvided(super::manual::EmbedderOptions {
|
||||
dimensions,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::UserProvided),
|
||||
model: Setting::NotSet,
|
||||
revision: Setting::NotSet,
|
||||
api_key: Setting::NotSet,
|
||||
dimensions: Setting::Set(options.dimensions),
|
||||
dimensions: Setting::Set(dimensions),
|
||||
document_template: Setting::NotSet,
|
||||
url: Setting::NotSet,
|
||||
query: Setting::NotSet,
|
||||
@ -359,7 +376,7 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::Rest(super::rest::EmbedderOptions {
|
||||
api_key,
|
||||
|
@ -217,9 +217,7 @@ fn add_memory_samples(
|
||||
memory_counters: &mut Option<MemoryCounterHandles>,
|
||||
last_memory: &mut MemoryStats,
|
||||
) -> Option<MemoryStats> {
|
||||
let Some(stats) = memory else {
|
||||
return None;
|
||||
};
|
||||
let stats = memory?;
|
||||
|
||||
let memory_counters =
|
||||
memory_counters.get_or_insert_with(|| MemoryCounterHandles::new(profile, main));
|
||||
|
Reference in New Issue
Block a user