Compare commits

...

8 Commits

Author SHA1 Message Date
f46c2de607 Fix the tests 2023-09-15 12:01:29 +02:00
388d78f70e update the description of the cli argument 2023-09-15 12:01:29 +02:00
b252900470 Expose a new flag to limit the number of batched tasks 2023-09-15 12:01:28 +02:00
8822ca234e Merge #4057
4057: Fix stats delete by filter for v1.3.4 r=irevoire a=curquiza

Fixes https://github.com/meilisearch/meilisearch/issues/4018 for v1.3.4

Co-authored-by: Tamo <tamo@meilisearch.com>
2023-09-12 13:34:39 +00:00
d23abc8771 fix clippy 2023-09-12 11:26:48 +02:00
036b846e4d Fix the stats of the documents deletion by filter
The issue was that the operation « DocumentDeletionByFilter » was not
declared as an index operation. That means the indexes stats were not
reprocessed after the application of the operation.
2023-09-12 11:26:41 +02:00
9889390d13 Merge #4055
4055: Update version for the next release (v1.3.4) in Cargo.toml r=curquiza a=meili-bot

⚠️ This PR is automatically generated. Check the new version is the expected one and Cargo.lock has been updated before merging.

Co-authored-by: curquiza <curquiza@users.noreply.github.com>
2023-09-11 17:04:31 +00:00
8e2bb29cf1 Update version for the next release (v1.3.4) in Cargo.toml 2023-09-11 16:20:03 +00:00
9 changed files with 146 additions and 82 deletions

28
Cargo.lock generated
View File

@ -469,7 +469,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "benchmarks"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"anyhow",
"bytes",
@ -1199,7 +1199,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"anyhow",
"big_s",
@ -1413,7 +1413,7 @@ dependencies = [
[[package]]
name = "file-store"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"faux",
"tempfile",
@ -1435,7 +1435,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"insta",
"nom",
@ -1455,7 +1455,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"criterion",
"serde_json",
@ -1573,7 +1573,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"arbitrary",
"clap",
@ -1895,7 +1895,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"anyhow",
"big_s",
@ -2082,7 +2082,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"criterion",
"serde_json",
@ -2494,7 +2494,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"insta",
"md5",
@ -2503,7 +2503,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"actix-cors",
"actix-http",
@ -2592,7 +2592,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"base64 0.21.2",
"enum-iterator",
@ -2611,7 +2611,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"actix-web",
"anyhow",
@ -2665,7 +2665,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"big_s",
"bimap",
@ -2995,7 +2995,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "permissive-json-pointer"
version = "1.3.3"
version = "1.3.4"
dependencies = [
"big_s",
"serde_json",

View File

@ -18,7 +18,7 @@ members = [
]
[workspace.package]
version = "1.3.3"
version = "1.3.4"
authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"]
description = "Meilisearch HTTP server"
homepage = "https://meilisearch.com"

View File

@ -67,10 +67,6 @@ pub(crate) enum Batch {
op: IndexOperation,
must_create_index: bool,
},
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
@ -114,6 +110,10 @@ pub(crate) enum IndexOperation {
documents: Vec<Vec<String>>,
tasks: Vec<Task>,
},
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
},
DocumentClear {
index_uid: String,
tasks: Vec<Task>,
@ -155,7 +155,6 @@ impl Batch {
| Batch::TaskDeletion(task)
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::IndexDocumentDeletionByFilter { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
@ -167,6 +166,7 @@ impl Batch {
| IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
}
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid],
IndexOperation::SettingsAndDocumentOperation {
document_import_tasks: tasks,
settings_tasks: other,
@ -194,8 +194,7 @@ impl Batch {
IndexOperation { op, .. } => Some(op.index_uid()),
IndexCreation { index_uid, .. }
| IndexUpdate { index_uid, .. }
| IndexDeletion { index_uid, .. }
| IndexDocumentDeletionByFilter { index_uid, .. } => Some(index_uid),
| IndexDeletion { index_uid, .. } => Some(index_uid),
}
}
}
@ -205,6 +204,7 @@ impl IndexOperation {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
@ -239,9 +239,12 @@ impl IndexScheduler {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
KindWithContent::DocumentDeletionByFilter { index_uid, .. } => {
Ok(Some(Batch::IndexDocumentDeletionByFilter {
index_uid: index_uid.clone(),
task,
Ok(Some(Batch::IndexOperation {
op: IndexOperation::IndexDocumentDeletionByFilter {
index_uid: index_uid.clone(),
task,
},
must_create_index: false,
}))
}
_ => unreachable!(),
@ -534,7 +537,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
// Otherwise, we take only a maximum of tasks to create batches.
let tasks_limit =
if self.autobatching_enabled { self.maximum_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks
.into_iter()
@ -891,51 +896,6 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let (index_uid, filter) =
if let KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr } =
&task.kind
{
(index_uid, filter_expr)
} else {
unreachable!()
};
let index = {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
};
let deleted_documents = delete_document_by_filter(filter, index);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
Batch::IndexCreation { index_uid, primary_key, task } => {
let wtxn = self.env.write_txn()?;
if self.index_mapper.exists(&wtxn, &index_uid)? {
@ -1292,6 +1252,47 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
&task.kind
{
filter_expr
} else {
unreachable!()
};
let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
}) = task.details
{
original_filter
} else {
// In the case of a `documentDeleteByFilter` the details MUST be set
unreachable!();
};
match deleted_documents {
Ok(deleted_documents) => {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(deleted_documents),
});
}
Err(e) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: Some(0),
});
task.error = Some(e.into());
}
}
Ok(vec![task])
}
IndexOperation::Settings { index_uid: _, settings, mut tasks } => {
let indexer_config = self.index_mapper.indexer_config();
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
@ -1491,23 +1492,22 @@ impl IndexScheduler {
}
}
fn delete_document_by_filter(filter: &serde_json::Value, index: Index) -> Result<u64> {
fn delete_document_by_filter<'a>(
wtxn: &mut RwTxn<'a, '_>,
filter: &serde_json::Value,
index: &'a Index,
) -> Result<u64> {
let filter = Filter::from_json(filter)?;
Ok(if let Some(filter) = filter {
let mut wtxn = index.write_txn()?;
let candidates = filter.evaluate(&wtxn, &index).map_err(|err| match err {
let candidates = filter.evaluate(wtxn, index).map_err(|err| match err {
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
Error::from(err).with_custom_error_code(Code::InvalidDocumentFilter)
}
e => e.into(),
})?;
let mut delete_operation = DeleteDocuments::new(&mut wtxn, &index)?;
let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
delete_operation.delete_documents(&candidates);
let deleted_documents =
delete_operation.execute().map(|result| result.deleted_documents)?;
wtxn.commit()?;
deleted_documents
delete_operation.execute().map(|result| result.deleted_documents)?
} else {
0
})

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
autobatching_enabled,
maximum_number_of_batched_tasks: _,
must_stop_processing: _,
processing_tasks,
file_store,

View File

@ -253,6 +253,9 @@ pub struct IndexSchedulerOptions {
/// Set to `true` iff the index scheduler is allowed to automatically
/// batch tasks together, to process multiple tasks at once.
pub autobatching_enabled: bool,
/// If the autobatcher is allowed to automatically batch tasks
/// it will only batch this defined number of tasks at once.
pub maximum_number_of_batched_tasks: usize,
/// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions.
pub max_number_of_tasks: usize,
@ -310,6 +313,9 @@ pub struct IndexScheduler {
/// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool,
/// The maximum number of tasks that will be batched together.
pub(crate) maximum_number_of_batched_tasks: usize,
/// The max number of tasks allowed before the scheduler starts to delete
/// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize,
@ -363,6 +369,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
maximum_number_of_batched_tasks: self.maximum_number_of_batched_tasks,
max_number_of_tasks: self.max_number_of_tasks,
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
@ -458,6 +465,7 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled: options.autobatching_enabled,
maximum_number_of_batched_tasks: options.maximum_number_of_batched_tasks,
max_number_of_tasks: options.max_number_of_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
@ -1587,6 +1595,7 @@ mod tests {
index_count: 5,
indexer_config,
autobatching_enabled: true,
maximum_number_of_batched_tasks: usize::MAX,
max_number_of_tasks: 1_000_000,
instance_features: Default::default(),
};

View File

@ -285,6 +285,7 @@ impl From<Opt> for Infos {
db_path,
experimental_enable_metrics,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks: _,
http_addr,
master_key: _,
env,

View File

@ -236,6 +236,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true,
maximum_number_of_batched_tasks: opt.experimental_limit_batched_tasks,
max_number_of_tasks: 1_000_000,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
index_count: DEFAULT_INDEX_COUNT,

View File

@ -51,6 +51,7 @@ const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
"MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE";
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS: &str = "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms";
@ -301,6 +302,11 @@ pub struct Opt {
#[serde(default)]
pub experimental_reduce_indexing_memory_usage: bool,
/// Experimental limit to the number of tasks per batch
#[clap(long, env = MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS, default_value_t = default_limit_batched_tasks())]
#[serde(default = "default_limit_batched_tasks")]
pub experimental_limit_batched_tasks: usize,
#[serde(flatten)]
#[clap(flatten)]
pub indexer_options: IndexerOpts,
@ -393,7 +399,8 @@ impl Opt {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
no_analytics,
experimental_enable_metrics: enable_metrics_route,
experimental_reduce_indexing_memory_usage: reduce_indexing_memory_usage,
experimental_reduce_indexing_memory_usage,
experimental_limit_batched_tasks,
} = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -437,7 +444,11 @@ impl Opt {
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE,
reduce_indexing_memory_usage.to_string(),
experimental_reduce_indexing_memory_usage.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS,
experimental_limit_batched_tasks.to_string(),
);
indexer_options.export_to_env();
}
@ -739,6 +750,10 @@ fn default_dump_dir() -> PathBuf {
PathBuf::from(DEFAULT_DUMP_DIR)
}
fn default_limit_batched_tasks() -> usize {
usize::MAX
}
/// Indicates if a snapshot was scheduled, and if yes with which interval.
#[derive(Debug, Default, Copy, Clone, Deserialize, Serialize)]
pub enum ScheduleSnapshot {

View File

@ -154,6 +154,19 @@ async fn delete_document_by_filter() {
)
.await;
index.wait_task(1).await;
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 4,
"isIndexing": false,
"fieldDistribution": {
"color": 3,
"id": 4
}
}
"###);
let (response, code) =
index.delete_document_by_filter(json!({ "filter": "color = blue"})).await;
snapshot!(code, @"202 Accepted");
@ -188,6 +201,18 @@ async fn delete_document_by_filter() {
}
"###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 2,
"isIndexing": false,
"fieldDistribution": {
"color": 1,
"id": 2
}
}
"###);
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(documents), @r###"
@ -241,6 +266,18 @@ async fn delete_document_by_filter() {
}
"###);
let (stats, _) = index.stats().await;
snapshot!(json_string!(stats), @r###"
{
"numberOfDocuments": 1,
"isIndexing": false,
"fieldDistribution": {
"color": 1,
"id": 1
}
}
"###);
let (documents, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(documents), @r###"