Compare commits

...

7 Commits

11 changed files with 76 additions and 33 deletions

51
Cargo.lock generated
View File

@ -700,8 +700,7 @@ dependencies = [
[[package]]
name = "charabia"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "098219a776307414866165a03a9cc68c1578764fe3616fe979e1c280790ddd73"
source = "git+https://github.com/meilisearch/charabia?branch=main#5c3d09a7127dcf5e0e5d94d991c4d3d5ef4768cc"
dependencies = [
"aho-corasick",
"cow-utils",
@ -2177,9 +2176,9 @@ dependencies = [
[[package]]
name = "lindera-cc-cedict-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d2e8f2ca97ddf952fe340642511b9c14b373cb2eef711d526bb8ef2ca0969b8"
checksum = "6f567a47e47b5420908424de2c6c5e424e3cafe588d0146bd128c0f3755758a3"
dependencies = [
"anyhow",
"bincode",
@ -2196,9 +2195,9 @@ dependencies = [
[[package]]
name = "lindera-compress"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f72b460559bcbe8a9cee85ea4a5056133ed3abf373031191589236e656d65b59"
checksum = "49f3e553d55ebe9881fa5e5de588b0a153456e93564d17dfbef498912caf63a2"
dependencies = [
"anyhow",
"flate2",
@ -2207,9 +2206,9 @@ dependencies = [
[[package]]
name = "lindera-core"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f586eb8a9393c32d5525e0e9336a3727bd1329674740097126f3b0bff8a1a1ea"
checksum = "a9a2440cc156a4a911a174ec68203543d1efb10df3a700a59b6bf581e453c726"
dependencies = [
"anyhow",
"bincode",
@ -2224,9 +2223,9 @@ dependencies = [
[[package]]
name = "lindera-decompress"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fb1facd8da698072fcc7338bd757730db53d59f313f44dd583fa03681dcc0e1"
checksum = "e077a410e61c962cb526f71b7effd62ffc607488a8f61869c937582d2ccb529b"
dependencies = [
"anyhow",
"flate2",
@ -2235,9 +2234,9 @@ dependencies = [
[[package]]
name = "lindera-dictionary"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec7be7410b1da7017a8948986b87af67082f605e9a716f0989790d795d677f0c"
checksum = "d9f57491adf7b311a3ee87f5e4a36454df16a2ec73de4ef28b2106fac80bd782"
dependencies = [
"anyhow",
"bincode",
@ -2255,9 +2254,9 @@ dependencies = [
[[package]]
name = "lindera-ipadic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "705d07f8a45d04fd95149f7ad41a26d1f9e56c9c00402be6f9dd05e3d88b99c6"
checksum = "a3476ec7748aebd2eb23d496ddfce5e7e0a5c031cffcd214451043e02d029f11"
dependencies = [
"anyhow",
"bincode",
@ -2276,9 +2275,9 @@ dependencies = [
[[package]]
name = "lindera-ipadic-neologd-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "633a93983ba13fba42328311a501091bd4a7aff0c94ae9eaa9d4733dd2b0468a"
checksum = "7b1c7576a02d5e4af2bf62de51790a01bc4b8bc0d0b6a6b86a46b157f5cb306d"
dependencies = [
"anyhow",
"bincode",
@ -2297,9 +2296,9 @@ dependencies = [
[[package]]
name = "lindera-ko-dic"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a428e0d316b6c86f51bd919479692bc41ad840dba266ebc044663970f431ea18"
checksum = "b713ecd5b827d7d448c3c5eb3c6d5899ecaf22cd17087599996349a02c76828d"
dependencies = [
"bincode",
"byteorder",
@ -2314,9 +2313,9 @@ dependencies = [
[[package]]
name = "lindera-ko-dic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5288704c6b8a069c0a1705c38758e836497698b50453373ab3d56c6f9a7ef8"
checksum = "3e545752f6487be87b572529ad594cb3b48d2ef20821516f598b2d152d23277b"
dependencies = [
"anyhow",
"bincode",
@ -2334,9 +2333,9 @@ dependencies = [
[[package]]
name = "lindera-tokenizer"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "106ba439b2e87529d9bbedbb88d69f635baba1195c26502b308f55a85885fc81"
checksum = "24a2d4606a5a4da62ac4a3680ee884a75da7f0c892dc967fc9cb983ceba39a8f"
dependencies = [
"bincode",
"byteorder",
@ -2349,9 +2348,9 @@ dependencies = [
[[package]]
name = "lindera-unidic"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3399b6dcfe1701333451d184ff3c677f433b320153427b146360c9e4bd8cb816"
checksum = "388b1bdf81794b5d5b8057ce0321c58ff4b90d676b637948ccc7863ae2f43d28"
dependencies = [
"bincode",
"byteorder",
@ -2366,9 +2365,9 @@ dependencies = [
[[package]]
name = "lindera-unidic-builder"
version = "0.27.0"
version = "0.27.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b698227fdaeac32289173ab389b990d4eb00a40cbc9912020f69a0c491dabf55"
checksum = "cdfa3e29a22c047da57fadd960ff674b720de15a1e2fb35b5ed67f3408afb469"
dependencies = [
"anyhow",
"bincode",

View File

@ -539,7 +539,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
// Otherwise, we take only a maximum of tasks to create batches.
let tasks_limit =
if self.autobatching_enabled { self.maximum_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks
.into_iter()

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
autobatching_enabled,
maximum_number_of_batched_tasks: _,
must_stop_processing: _,
processing_tasks,
file_store,

View File

@ -253,6 +253,9 @@ pub struct IndexSchedulerOptions {
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once.
pub maximum_number_of_batched_tasks: usize,
/// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions.
pub max_number_of_tasks: usize,
@ -310,6 +313,9 @@ pub struct IndexScheduler {
/// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool,
/// The maximum number of tasks that will be batched together.
pub(crate) maximum_number_of_batched_tasks: usize,
/// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize,
@ -363,6 +369,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
maximum_number_of_batched_tasks: self.maximum_number_of_batched_tasks,
max_number_of_tasks: self.max_number_of_tasks,
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
@ -458,6 +465,7 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
maximum_number_of_batched_tasks: options.maximum_number_of_batched_tasks,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
@ -1589,6 +1597,7 @@ mod tests {
index_count: 5,
indexer_config,
autobatching_enabled: true,
maximum_number_of_batched_tasks: usize::MAX,
max_number_of_tasks: 1_000_000,
instance_features: Default::default(),
};

View File

@ -133,7 +133,7 @@ vergen = { version = "7.5.1", default-features = false, features = ["git"] }
zip = { version = "0.6.4", optional = true }
[features]
default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard"]
default = ["analytics", "meilisearch-types/all-tokenizations", "mini-dashboard", "profile-with-puffin"]
analytics = ["segment"]
profile-with-puffin = ["dep:puffin_http"]
mini-dashboard = [

View File

@ -285,6 +285,7 @@ impl From<Opt> for Infos {
db_path,
experimental_enable_metrics,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks: _,
http_addr,
master_key: _,
env,

View File

@ -236,6 +236,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
maximum_number_of_batched_tasks: opt.experimental_limit_batched_tasks,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,

View File

@ -51,6 +51,7 @@ const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS: &str = "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
@ -301,6 +302,11 @@ pub struct Opt {
#[serde(default)]
pub experimental_reduce_indexing_memory_usage: bool,
/// Experimental limit to the number of tasks per batch
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")]
pub experimental_limit_batched_tasks: usize,
#[serde(flatten)]
#[clap(flatten)]
pub indexer_options: IndexerOpts,
@ -393,7 +399,8 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics,
experimental_enable_metrics: enable_metrics_route,
experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -437,7 +444,11 @@ impl Opt {
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
reduce_indexing_memory_usage.to_string(),
experimental_reduce_indexing_memory_usage.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS,
experimental_limit_batched_tasks.to_string(),
);
indexer_options.export_to_env();
}
@ -739,6 +750,10 @@ fn default_dump_dir() -> PathBuf {
PathBuf::from(DEFAULT_DUMP_DIR)
}
fn default_limit_batched_tasks() -> usize {
usize::MAX
}
/// Indicates if a snapshot was scheduled, and if yes with which interval.
#[derive(Debug, Default, Copy, Clone, Deserialize, Serialize)]
pub enum ScheduleSnapshot {

View File

@ -17,7 +17,8 @@ bincode = "1.3.3"
bstr = "1.4.0"
bytemuck = { version = "1.13.1", features = ["extern_crate_alloc"] }
byteorder = "1.4.3"
charabia = { version = "0.8.3", default-features = false }
# charabia = { version = "0.8.3", default-features = false }
charabia = { git = "https://github.com/meilisearch/charabia", branch = "main", default-features = false }
concat-arrays = "0.1.2"
crossbeam-channel = "0.5.8"
deserr = { version = "0.6.0", features = ["actix-web"]}

View File

@ -108,15 +108,17 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
self.delete_document(docid);
Some(docid)
}
pub fn execute(self) -> Result<DocumentDeletionResult> {
puffin::profile_function!();
pub fn execute(self) -> Result<DocumentDeletionResult> {
let DetailedDocumentDeletionResult { deleted_documents, remaining_documents } =
self.execute_inner()?;
Ok(DocumentDeletionResult { deleted_documents, remaining_documents })
}
pub(crate) fn execute_inner(mut self) -> Result<DetailedDocumentDeletionResult> {
puffin::profile_function!();
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
// We retrieve the current documents ids that are in the database.
@ -476,6 +478,8 @@ impl<'t, 'u, 'i> DeleteDocuments<'t, 'u, 'i> {
C: for<'a> BytesDecode<'a, DItem = RoaringBitmap>
+ for<'a> BytesEncode<'a, EItem = RoaringBitmap>,
{
puffin::profile_function!();
while let Some(result) = iter.next() {
let (bytes, mut docids) = result?;
let previous_len = docids.len();
@ -498,6 +502,8 @@ fn remove_from_word_prefix_docids(
db: &Database<Str, RoaringBitmapCodec>,
to_remove: &RoaringBitmap,
) -> Result<fst::Set<Vec<u8>>> {
puffin::profile_function!();
let mut prefixes_to_delete = fst::SetBuilder::memory();
// We iterate over the word prefix docids database and remove the deleted documents ids
@ -528,6 +534,8 @@ fn remove_from_word_docids(
words_to_keep: &mut BTreeSet<String>,
words_to_remove: &mut BTreeSet<String>,
) -> Result<()> {
puffin::profile_function!();
// We create an iterator to be able to get the content and delete the word docids.
// It's faster to acquire a cursor to get and delete or put, as we avoid traversing
// the LMDB B-Tree two times but only once.
@ -559,6 +567,8 @@ fn remove_docids_from_field_id_docid_facet_value(
field_id: FieldId,
to_remove: &RoaringBitmap,
) -> heed::Result<HashSet<Vec<u8>>> {
puffin::profile_function!();
let db = match facet_type {
FacetType::String => {
index.field_id_docid_facet_strings.remap_types::<ByteSlice, DecodeIgnore>()
@ -594,6 +604,8 @@ fn remove_docids_from_facet_id_docids<'a, C>(
where
C: heed::BytesDecode<'a> + heed::BytesEncode<'a>,
{
puffin::profile_function!();
let mut iter = db.remap_key_type::<ByteSlice>().iter_mut(wtxn)?;
while let Some(result) = iter.next() {
let (bytes, mut docids) = result?;

View File

@ -54,6 +54,8 @@ pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
puffin::profile_function!();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,