mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-30 01:35:36 +00:00
Compare commits
18 Commits
release-v1
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0f446e4d3 | ||
|
|
3bbad823e0 | ||
|
|
b605549bf2 | ||
|
|
6a1062edf5 | ||
|
|
426ea5aa97 | ||
|
|
e20b91210d | ||
|
|
17478301ab | ||
|
|
968c9dff27 | ||
|
|
463553988c | ||
|
|
c321fdb9c0 | ||
|
|
36b6e94b29 | ||
|
|
34dea863e5 | ||
|
|
ad9d8e10f2 | ||
|
|
f7f35ef37c | ||
|
|
c575d2693b | ||
|
|
024e06f7e3 | ||
|
|
145fa3a8ff | ||
|
|
d3a7e10348 |
34
Cargo.lock
generated
34
Cargo.lock
generated
@@ -496,7 +496,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
|
||||
|
||||
[[package]]
|
||||
name = "benchmarks"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bumpalo",
|
||||
@@ -689,7 +689,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "build-info"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"time",
|
||||
@@ -1664,7 +1664,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "dump"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"big_s",
|
||||
@@ -1876,7 +1876,7 @@ checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4"
|
||||
|
||||
[[package]]
|
||||
name = "file-store"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"tempfile",
|
||||
"thiserror",
|
||||
@@ -1898,7 +1898,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "filter-parser"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"insta",
|
||||
"nom",
|
||||
@@ -1918,7 +1918,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "flatten-serde-json"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"serde_json",
|
||||
@@ -2057,7 +2057,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "fuzzers"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"bumpalo",
|
||||
@@ -2624,7 +2624,7 @@ checksum = "206ca75c9c03ba3d4ace2460e57b189f39f43de612c2f85836e65c929701bb2d"
|
||||
|
||||
[[package]]
|
||||
name = "index-scheduler"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -2822,7 +2822,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "json-depth-checker"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"criterion",
|
||||
"serde_json",
|
||||
@@ -3441,7 +3441,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
||||
[[package]]
|
||||
name = "meili-snap"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"insta",
|
||||
"md5",
|
||||
@@ -3450,7 +3450,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"actix-cors",
|
||||
"actix-http",
|
||||
@@ -3540,7 +3540,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch-auth"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"enum-iterator",
|
||||
@@ -3559,7 +3559,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilisearch-types"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"anyhow",
|
||||
@@ -3592,7 +3592,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meilitool"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)",
|
||||
@@ -3627,7 +3627,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "milli"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"allocator-api2",
|
||||
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
@@ -4083,7 +4083,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||
|
||||
[[package]]
|
||||
name = "permissive-json-pointer"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"big_s",
|
||||
"serde_json",
|
||||
@@ -6486,7 +6486,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "xtask"
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"build-info",
|
||||
|
||||
@@ -22,7 +22,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.12.5"
|
||||
version = "1.12.7"
|
||||
authors = [
|
||||
"Quentin de Quelen <quentin@dequelen.me>",
|
||||
"Clément Renault <clement@meilisearch.com>",
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::env::VarError;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
|
||||
@@ -302,7 +304,15 @@ fn create_or_open_index(
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
options.max_readers(1024);
|
||||
|
||||
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
|
||||
Ok(value) => u32::from_str(&value).unwrap(),
|
||||
Err(VarError::NotPresent) => 1024,
|
||||
Err(VarError::NotUnicode(value)) => panic!(
|
||||
"Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}"
|
||||
),
|
||||
};
|
||||
options.max_readers(max_readers);
|
||||
if enable_mdb_writemap {
|
||||
unsafe { options.flags(EnvFlags::WRITE_MAP) };
|
||||
}
|
||||
|
||||
@@ -1746,3 +1746,57 @@ async fn change_attributes_settings() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Modifying facets with different casing should work correctly
|
||||
#[actix_rt::test]
|
||||
async fn change_facet_casing() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
|
||||
let (response, code) = index
|
||||
.update_settings(json!({
|
||||
"filterableAttributes": ["dog"],
|
||||
}))
|
||||
.await;
|
||||
assert_eq!("202", code.as_str(), "{:?}", response);
|
||||
index.wait_task(response.uid()).await;
|
||||
|
||||
let (response, _code) = index
|
||||
.add_documents(
|
||||
json!([
|
||||
{
|
||||
"id": 1,
|
||||
"dog": "Bouvier Bernois"
|
||||
}
|
||||
]),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
index.wait_task(response.uid()).await;
|
||||
|
||||
let (response, _code) = index
|
||||
.add_documents(
|
||||
json!([
|
||||
{
|
||||
"id": 1,
|
||||
"dog": "bouvier bernois"
|
||||
}
|
||||
]),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
index.wait_task(response.uid()).await;
|
||||
|
||||
index
|
||||
.search(json!({ "facets": ["dog"] }), |response, code| {
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(meili_snap::json_string!(response["facetDistribution"]), @r###"
|
||||
{
|
||||
"dog": {
|
||||
"bouvier bernois": 1
|
||||
}
|
||||
}
|
||||
"###);
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
@@ -44,9 +44,9 @@ impl OfflineUpgrade {
|
||||
}
|
||||
|
||||
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
|
||||
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
|
||||
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.7";
|
||||
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
|
||||
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
|
||||
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
|
||||
|
||||
let upgrade_list = [
|
||||
(
|
||||
@@ -73,7 +73,7 @@ impl OfflineUpgrade {
|
||||
("1", "10", _) => 1,
|
||||
("1", "11", _) => 2,
|
||||
("1", "12", "0" | "1" | "2") => 3,
|
||||
("1", "12", "3" | "4" | "5") => no_upgrade,
|
||||
("1", "12", "3" | "4" | "5" | "6" | "7") => no_upgrade,
|
||||
_ => {
|
||||
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
|
||||
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
|
||||
@@ -87,7 +87,7 @@ impl OfflineUpgrade {
|
||||
("1", "10", _) => 0,
|
||||
("1", "11", _) => 1,
|
||||
("1", "12", "0" | "1" | "2") => 2,
|
||||
("1", "12", "3" | "4" | "5") => 3,
|
||||
("1", "12", "3" | "4" | "5" | "6" | "7") => 3,
|
||||
(major, _, _) if major.starts_with('v') => {
|
||||
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
|
||||
}
|
||||
|
||||
@@ -219,12 +219,19 @@ impl<'a> FacetDistribution<'a> {
|
||||
let facet_key = StrRefCodec::bytes_decode(facet_key).unwrap();
|
||||
|
||||
let key: (FieldId, _, &str) = (field_id, any_docid, facet_key);
|
||||
let original_string = self
|
||||
.index
|
||||
.field_id_docid_facet_strings
|
||||
.get(self.rtxn, &key)?
|
||||
.unwrap()
|
||||
.to_owned();
|
||||
let optional_original_string =
|
||||
self.index.field_id_docid_facet_strings.get(self.rtxn, &key)?;
|
||||
|
||||
let original_string = match optional_original_string {
|
||||
Some(original_string) => original_string.to_owned(),
|
||||
None => {
|
||||
tracing::error!(
|
||||
"Missing original facet string. Using the normalized facet {} instead",
|
||||
facet_key
|
||||
);
|
||||
facet_key.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
distribution.insert(original_string, nbr_docids);
|
||||
if distribution.len() == self.max_values_per_facet {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use rayon::{ThreadPool, ThreadPoolBuilder};
|
||||
@@ -9,6 +9,8 @@ use thiserror::Error;
|
||||
#[derive(Debug)]
|
||||
pub struct ThreadPoolNoAbort {
|
||||
thread_pool: ThreadPool,
|
||||
/// The number of active operations.
|
||||
active_operations: AtomicUsize,
|
||||
/// Set to true if the thread pool catched a panic.
|
||||
pool_catched_panic: Arc<AtomicBool>,
|
||||
}
|
||||
@@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
|
||||
OP: FnOnce() -> R + Send,
|
||||
R: Send,
|
||||
{
|
||||
self.active_operations.fetch_add(1, Ordering::Relaxed);
|
||||
let output = self.thread_pool.install(op);
|
||||
self.active_operations.fetch_sub(1, Ordering::Relaxed);
|
||||
// While reseting the pool panic catcher we return an error if we catched one.
|
||||
if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
|
||||
Err(PanicCatched)
|
||||
@@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
|
||||
pub fn current_num_threads(&self) -> usize {
|
||||
self.thread_pool.current_num_threads()
|
||||
}
|
||||
|
||||
/// The number of active operations.
|
||||
pub fn active_operations(&self) -> usize {
|
||||
self.active_operations.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
|
||||
let catched_panic = pool_catched_panic.clone();
|
||||
move |_result| catched_panic.store(true, Ordering::SeqCst)
|
||||
});
|
||||
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic })
|
||||
Ok(ThreadPoolNoAbort {
|
||||
thread_pool: self.0.build()?,
|
||||
active_operations: AtomicUsize::new(0),
|
||||
pool_catched_panic,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
|
||||
use crate::vector::Embedding;
|
||||
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
|
||||
|
||||
/// Note that the FrameProducer requires up to 9 bytes to
|
||||
/// encode the length, the max grant has been computed accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
const MAX_FRAME_HEADER_SIZE: usize = 9;
|
||||
|
||||
/// Creates a tuple of senders/receiver to be used by
|
||||
/// the extractors and the writer loop.
|
||||
///
|
||||
@@ -53,8 +59,9 @@ pub fn extractor_writer_bbqueue(
|
||||
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
|
||||
|
||||
let capacity = bbbuffers.first().unwrap().capacity();
|
||||
// Read the field description to understand this
|
||||
let capacity = capacity.checked_sub(9).unwrap();
|
||||
// 1. Due to fragmentation in the bbbuffer, we can only accept up to half the capacity in a single message.
|
||||
// 2. Read the documentation for `MAX_FRAME_HEADER_SIZE` for more information about why it is here.
|
||||
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
|
||||
|
||||
let producers = ThreadLocal::with_capacity(bbbuffers.len());
|
||||
let consumers = rayon::broadcast(|bi| {
|
||||
@@ -65,7 +72,7 @@ pub fn extractor_writer_bbqueue(
|
||||
});
|
||||
|
||||
let (sender, receiver) = flume::bounded(channel_capacity);
|
||||
let sender = ExtractorBbqueueSender { sender, producers, capacity };
|
||||
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
|
||||
let receiver = WriterBbqueueReceiver {
|
||||
receiver,
|
||||
look_at_consumer: (0..consumers.len()).cycle(),
|
||||
@@ -81,13 +88,10 @@ pub struct ExtractorBbqueueSender<'a> {
|
||||
/// A memory buffer, one by thread, is used to serialize
|
||||
/// the entries directly in this shared, lock-free space.
|
||||
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
||||
/// The capacity of this frame producer, will never be able to store more than that.
|
||||
///
|
||||
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
|
||||
/// the capacity has been shrunk accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
capacity: usize,
|
||||
/// The maximum frame grant that a producer can reserve.
|
||||
/// It will never be able to store more than that as the
|
||||
/// buffer cannot split data into two parts.
|
||||
max_grant: usize,
|
||||
}
|
||||
|
||||
pub struct WriterBbqueueReceiver<'a> {
|
||||
@@ -443,14 +447,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
||||
let total_length = EntryHeader::total_delete_vector_size();
|
||||
if total_length > capacity {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
||||
if total_length > max_grant {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
@@ -468,7 +472,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
embedder_id: u8,
|
||||
embeddings: &[Vec<f32>],
|
||||
) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
@@ -479,7 +483,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
||||
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
|
||||
if total_length > capacity {
|
||||
if total_length > max_grant {
|
||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||
for embedding in embeddings {
|
||||
let mut embedding_bytes = bytemuck::cast_slice(embedding);
|
||||
@@ -540,14 +544,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
where
|
||||
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let operation = DbOperation { database, key_length: Some(key_length) };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
|
||||
if total_length > capacity {
|
||||
if total_length > max_grant {
|
||||
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
|
||||
let value_file = tempfile::tempfile()?;
|
||||
value_file.set_len(value_length.try_into().unwrap())?;
|
||||
@@ -601,7 +605,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
@@ -610,8 +614,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
let operation = DbOperation { database, key_length: None };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_size(key_length);
|
||||
if total_length > capacity {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
||||
if total_length > max_grant {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
|
||||
@@ -283,42 +283,60 @@ impl FacetedDocidsExtractor {
|
||||
}
|
||||
|
||||
struct DelAddFacetValue<'doc> {
|
||||
strings: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||
strings: HashMap<
|
||||
(FieldId, &'doc str),
|
||||
Option<BVec<'doc, u8>>,
|
||||
hashbrown::DefaultHashBuilder,
|
||||
&'doc Bump,
|
||||
>,
|
||||
f64s: HashMap<(FieldId, BVec<'doc, u8>), DelAdd, hashbrown::DefaultHashBuilder, &'doc Bump>,
|
||||
doc_alloc: &'doc Bump,
|
||||
}
|
||||
|
||||
impl<'doc> DelAddFacetValue<'doc> {
|
||||
fn new(doc_alloc: &'doc Bump) -> Self {
|
||||
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc) }
|
||||
Self { strings: HashMap::new_in(doc_alloc), f64s: HashMap::new_in(doc_alloc), doc_alloc }
|
||||
}
|
||||
|
||||
fn insert_add(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
||||
let cache = match kind {
|
||||
FacetKind::String => &mut self.strings,
|
||||
FacetKind::Number => &mut self.f64s,
|
||||
_ => return,
|
||||
};
|
||||
|
||||
let key = (fid, value);
|
||||
if let Some(DelAdd::Deletion) = cache.get(&key) {
|
||||
cache.remove(&key);
|
||||
} else {
|
||||
cache.insert(key, DelAdd::Addition);
|
||||
match kind {
|
||||
FacetKind::Number => {
|
||||
let key = (fid, value);
|
||||
if let Some(DelAdd::Deletion) = self.f64s.get(&key) {
|
||||
self.f64s.remove(&key);
|
||||
} else {
|
||||
self.f64s.insert(key, DelAdd::Addition);
|
||||
}
|
||||
}
|
||||
FacetKind::String => {
|
||||
if let Ok(s) = std::str::from_utf8(&value) {
|
||||
let normalized = crate::normalize_facet(s);
|
||||
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
|
||||
self.strings.insert((fid, truncated), Some(value));
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
fn insert_del(&mut self, fid: FieldId, value: BVec<'doc, u8>, kind: FacetKind) {
|
||||
let cache = match kind {
|
||||
FacetKind::String => &mut self.strings,
|
||||
FacetKind::Number => &mut self.f64s,
|
||||
_ => return,
|
||||
};
|
||||
|
||||
let key = (fid, value);
|
||||
if let Some(DelAdd::Addition) = cache.get(&key) {
|
||||
cache.remove(&key);
|
||||
} else {
|
||||
cache.insert(key, DelAdd::Deletion);
|
||||
match kind {
|
||||
FacetKind::Number => {
|
||||
let key = (fid, value);
|
||||
if let Some(DelAdd::Addition) = self.f64s.get(&key) {
|
||||
self.f64s.remove(&key);
|
||||
} else {
|
||||
self.f64s.insert(key, DelAdd::Deletion);
|
||||
}
|
||||
}
|
||||
FacetKind::String => {
|
||||
if let Ok(s) = std::str::from_utf8(&value) {
|
||||
let normalized = crate::normalize_facet(s);
|
||||
let truncated = self.doc_alloc.alloc_str(truncate_str(&normalized));
|
||||
self.strings.insert((fid, truncated), None);
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,18 +347,14 @@ impl<'doc> DelAddFacetValue<'doc> {
|
||||
doc_alloc: &Bump,
|
||||
) -> crate::Result<()> {
|
||||
let mut buffer = bumpalo::collections::Vec::new_in(doc_alloc);
|
||||
for ((fid, value), deladd) in self.strings {
|
||||
if let Ok(s) = std::str::from_utf8(&value) {
|
||||
buffer.clear();
|
||||
buffer.extend_from_slice(&fid.to_be_bytes());
|
||||
buffer.extend_from_slice(&docid.to_be_bytes());
|
||||
let normalized = crate::normalize_facet(s);
|
||||
let truncated = truncate_str(&normalized);
|
||||
buffer.extend_from_slice(truncated.as_bytes());
|
||||
match deladd {
|
||||
DelAdd::Deletion => sender.delete_facet_string(&buffer)?,
|
||||
DelAdd::Addition => sender.write_facet_string(&buffer, &value)?,
|
||||
}
|
||||
for ((fid, truncated), value) in self.strings {
|
||||
buffer.clear();
|
||||
buffer.extend_from_slice(&fid.to_be_bytes());
|
||||
buffer.extend_from_slice(&docid.to_be_bytes());
|
||||
buffer.extend_from_slice(truncated.as_bytes());
|
||||
match &value {
|
||||
Some(value) => sender.write_facet_string(&buffer, value)?,
|
||||
None => sender.delete_facet_string(&buffer)?,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -93,17 +93,25 @@ where
|
||||
..grenad_parameters
|
||||
};
|
||||
|
||||
// We compute and remove the allocated BBQueues buffers capacity from the indexing memory.
|
||||
let minimum_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
|
||||
// 5% percent of the allocated memory for the extractors, or min 100MiB
|
||||
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
|
||||
//
|
||||
// Minimum capacity for bbqueues
|
||||
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
|
||||
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
|
||||
|
||||
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
|
||||
(grenad_parameters, 2 * minimum_capacity), // 100 MiB by thread by default
|
||||
(
|
||||
GrenadParameters {
|
||||
max_memory: Some(minimum_total_extractors_capacity),
|
||||
..grenad_parameters
|
||||
},
|
||||
minimum_total_bbbuffer_capacity,
|
||||
), // 100 MiB by thread by default
|
||||
|max_memory| {
|
||||
// 2% of the indexing memory
|
||||
let total_bbbuffer_capacity = (max_memory / 100 / 2).max(minimum_capacity);
|
||||
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
|
||||
let new_grenad_parameters = GrenadParameters {
|
||||
max_memory: Some(
|
||||
max_memory.saturating_sub(total_bbbuffer_capacity).max(100 * 1024 * 1024),
|
||||
),
|
||||
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
|
||||
..grenad_parameters
|
||||
};
|
||||
(new_grenad_parameters, total_bbbuffer_capacity)
|
||||
|
||||
@@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
|
||||
|
||||
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::DistributionShift;
|
||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
||||
use crate::error::FaultSource;
|
||||
use crate::vector::Embedding;
|
||||
use crate::ThreadPoolNoAbort;
|
||||
@@ -113,20 +113,30 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
||||
@@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _;
|
||||
|
||||
use super::error::{EmbedError, NewEmbedderError};
|
||||
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
|
||||
use super::DistributionShift;
|
||||
use super::{DistributionShift, REQUEST_PARALLELISM};
|
||||
use crate::error::FaultSource;
|
||||
use crate::vector::error::EmbedErrorKind;
|
||||
use crate::vector::Embedding;
|
||||
@@ -270,20 +270,29 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
||||
@@ -203,20 +203,30 @@ impl Embedder {
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
kind: EmbedErrorKind::PanicInThreadPool(error),
|
||||
fault: FaultSource::Bug,
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
pub fn chunk_count_hint(&self) -> usize {
|
||||
|
||||
Reference in New Issue
Block a user