Compare commits

...

58 Commits

Author SHA1 Message Date
Mubelotix
7c2d478ef3 Merge branch 'main' into tasks-du-sheitan 2025-08-18 12:41:07 +02:00
Mubelotix
0c4cc4a00a Update tests 2025-08-12 19:27:54 +02:00
Mubelotix
28ac767f42 Fix #5832 2025-08-12 19:24:32 +02:00
Mubelotix
131823e4c5 Delete comment 2025-08-12 19:07:32 +02:00
Mubelotix
ae73465eab Restore http code issues 2025-08-12 19:07:11 +02:00
Mubelotix
1f7689a3fb Remove unwrap 2025-08-12 19:00:43 +02:00
Mubelotix
0b89783a3f Load status tasks only once 2025-08-12 18:38:42 +02:00
Mubelotix
808b9c71fa Remove old dump file 2025-08-12 17:56:46 +02:00
Mubelotix
8990f2a14a Fix tests and format 2025-08-12 17:39:58 +02:00
Mubelotix
fe275397ec Sweep old batches using dumpless upgrade 2025-08-12 16:48:28 +02:00
Mubelotix
d7776ec82b Move function to utils 2025-08-12 16:47:21 +02:00
Mubelotix
84244a74df Fix bug 2025-08-12 16:44:28 +02:00
Mubelotix
3b470af54c Move code to the right file 2025-08-12 16:03:00 +02:00
Mubelotix
702a265888 Merge branch 'main' into tasks-du-sheitan 2025-08-12 15:44:00 +02:00
Mubelotix
7eb871d671 Fix old databases imported from dumps for #5827 2025-08-12 15:41:10 +02:00
Mubelotix
eec711d93e Fix tests 2025-08-12 14:54:07 +02:00
Mubelotix
323d958bf6 Update test 2025-08-12 14:27:37 +02:00
Mubelotix
e9007effea Fix bug 2025-08-12 14:21:14 +02:00
Mubelotix
e0f5e05b22 Remove big comment 2025-08-12 14:07:06 +02:00
Mubelotix
f54a15606e Fix test being shared 2025-08-12 14:06:56 +02:00
Mubelotix
fee2fed044 Fix kinds not being deleted 2025-08-12 14:06:48 +02:00
Mubelotix
72ea3efdd5 Format 2025-08-12 13:47:54 +02:00
Mubelotix
e1fa79e92a Optim O: Put instead of update 2025-08-12 13:47:40 +02:00
Mubelotix
915cd1f108 Optim N: Clone task ids instead of matched tasks 2025-08-12 12:11:30 +02:00
Mubelotix
e5188699f9 Fix issue #5827 2025-08-12 11:59:00 +02:00
Mubelotix
1d46cb30f2 Add test for issue #5827 2025-08-12 11:52:39 +02:00
Mubelotix
f5bd405935 Fix newly introduced bug 2025-08-12 11:42:15 +02:00
Mubelotix
a156db4f28 Don't split read and write, it's bad for the cache 2025-08-12 11:03:11 +02:00
Mubelotix
0e9ac3ca92 Attempt 2025-08-12 10:48:37 +02:00
Mubelotix
a54d7d2269 Optim M: Completely stop reading batches 2025-08-12 10:18:17 +02:00
Mubelotix
60a1188a58 Optim L: Delete datetimes by range 2025-08-12 09:33:38 +02:00
Mubelotix
62ad7b345a Make code more readable 2025-08-11 18:09:11 +02:00
Mubelotix
16c57414ad Split progress step in half 2025-08-11 17:52:59 +02:00
Mubelotix
e23bf92502 Optim K: Make iterations sequential 2025-08-11 17:36:41 +02:00
Mubelotix
3fc09107ec Clean code 2025-08-11 15:48:58 +02:00
Mubelotix
68ad9d6021 Add test dump 2025-08-11 12:58:52 +02:00
Mubelotix
8cdf65bbbc Add new progress steps 2025-08-11 12:57:28 +02:00
Mubelotix
05c0b6ee6e Fix mistake 2025-08-08 16:04:10 +02:00
Mubelotix
1cb9816f44 Optim J: Use iterator on batch reader 2025-08-08 15:43:34 +02:00
Mubelotix
6f8d788aa8 Optim I: read batches before writing 2025-08-08 15:30:15 +02:00
Mubelotix
6a99c5b2f3 Optim H: Remove batches in iterator 2025-08-08 15:07:48 +02:00
Mubelotix
cb36257537 Rename variable 2025-08-08 15:07:35 +02:00
Mubelotix
ba0f0c3c30 Optim G: Use the datetime delete function on batches 2025-08-08 14:55:24 +02:00
Mubelotix
39eebac7e5 Optim F - Delay wtxn 2025-08-08 14:36:07 +02:00
Mubelotix
4c61d3a939 Optim E 2025-08-08 13:06:48 +02:00
Mubelotix
eb8ff31513 Make it more idiomatic 2025-08-08 12:56:49 +02:00
Mubelotix
418730ef73 Optim D 2025-08-08 12:24:47 +02:00
Mubelotix
9a29a7790b Optim C 2025-08-08 11:57:20 +02:00
Mubelotix
14cb1bbbfb Optimization B
Speeds up the subtask "deleting batches" by 9%
2025-08-08 11:32:17 +02:00
Mubelotix
31c4215ad2 Add debug assertion 2025-08-08 11:25:06 +02:00
Mubelotix
ff68802ffc Optimization A
Speeds up the "deleting tasks" subtask by 23%
2025-08-08 11:24:04 +02:00
Mubelotix
2d479332d3 Remove useless test functin 2025-08-07 11:35:45 +02:00
Mubelotix
ea894e6a2c Update tests 2025-08-07 11:30:01 +02:00
Mubelotix
4ae43eb51a Optimize code 2025-08-07 10:20:10 +02:00
Mubelotix
5309a37fb0 Fix many error code mistakes 2025-08-06 14:25:40 +02:00
Mubelotix
498b0b1419 Optimization 2025-08-06 12:33:53 +02:00
Mubelotix
0e9584672c Add test 2025-08-06 12:24:55 +02:00
Mubelotix
73e82d67d7 Add test 2025-08-06 11:14:07 +02:00
30 changed files with 930 additions and 227 deletions

View File

@@ -264,6 +264,7 @@ pub(crate) mod test {
use uuid::Uuid;
use crate::reader::Document;
use crate::writer::BatchWriter;
use crate::{DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version};
pub fn create_test_instance_uid() -> Uuid {
@@ -468,7 +469,7 @@ pub(crate) mod test {
]
}
pub fn create_test_dump() -> File {
pub fn create_test_dump_writer() -> (DumpWriter, BatchWriter) {
let instance_uid = create_test_instance_uid();
let dump = DumpWriter::new(Some(instance_uid)).unwrap();
@@ -490,7 +491,6 @@ pub(crate) mod test {
for batch in &batches {
batch_queue.push_batch(batch).unwrap();
}
batch_queue.flush().unwrap();
// ========== pushing the task queue
let tasks = create_test_tasks();
@@ -524,6 +524,13 @@ pub(crate) mod test {
let network = create_test_network();
dump.create_network(network).unwrap();
(dump, batch_queue)
}
pub fn create_test_dump() -> File {
let (dump, batch_writer) = create_test_dump_writer();
batch_writer.flush().unwrap();
// create the dump
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();

View File

@@ -229,12 +229,56 @@ impl From<CompatIndexV5ToV6> for DumpIndexReader {
#[cfg(test)]
pub(crate) mod test {
use std::fs::File;
use std::{fs::File, io::Seek};
use meili_snap::insta;
use meilisearch_types::{
batches::{Batch, BatchEnqueuedAt, BatchStats},
task_view::DetailsView,
tasks::{BatchStopReason, Kind, Status},
};
use time::macros::datetime;
use super::*;
use crate::reader::v6::RuntimeTogglableFeatures;
use crate::{reader::v6::RuntimeTogglableFeatures, test::create_test_dump_writer};
#[test]
fn import_dump_with_bad_batches() {
let (dump, mut batch_writer) = create_test_dump_writer();
let bad_batch = Batch {
uid: 1,
progress: None,
details: DetailsView::default(),
stats: BatchStats {
total_nb_tasks: 1,
status: maplit::btreemap! { Status::Succeeded => 666 },
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 666 },
index_uids: maplit::btreemap! { "doggo".to_string() => 666 },
progress_trace: Default::default(),
write_channel_congestion: None,
internal_database_sizes: Default::default(),
},
embedder_stats: Default::default(),
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC),
}),
started_at: datetime!(2022-11-20 0:00 UTC),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
stop_reason: BatchStopReason::Unspecified.to_string(),
};
batch_writer.push_batch(&bad_batch).unwrap();
batch_writer.flush().unwrap();
let mut file = tempfile::tempfile().unwrap();
dump.persist_to(&mut file).unwrap();
file.rewind().unwrap();
let mut dump = DumpReader::open(file).unwrap();
let read_batches = dump.batches().unwrap().map(|b| b.unwrap()).collect::<Vec<_>>();
assert!(!read_batches.iter().any(|b| b.uid == 1));
}
#[test]
fn import_dump_v6_with_vectors() {

View File

@@ -5,6 +5,7 @@ use std::path::Path;
pub use meilisearch_types::milli;
use meilisearch_types::milli::vector::hf::OverridePooling;
use roaring::RoaringBitmap;
use tempfile::TempDir;
use time::OffsetDateTime;
use tracing::debug;
@@ -56,6 +57,7 @@ pub struct V6Reader {
instance_uid: Option<Uuid>,
metadata: Metadata,
tasks: BufReader<File>,
tasks2: BufReader<File>,
batches: Option<BufReader<File>>,
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
@@ -122,6 +124,7 @@ impl V6Reader {
metadata: serde_json::from_reader(&*meta_file)?,
instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
tasks2: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
batches,
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features,
@@ -187,12 +190,48 @@ impl V6Reader {
}))
}
fn tasks2(&mut self) -> Box<dyn Iterator<Item = Result<Task>> + '_> {
Box::new(
(&mut self.tasks2)
.lines()
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
)
}
pub fn batches(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
// Get batches but filters batches so that those whose tasks have been deleted are not returned
// This is due to bug #5827 that caused them not to be deleted before version 1.18
let mut task_uids = RoaringBitmap::new();
let mut faulty = false;
for task in self.tasks2() {
let Ok(task) = task else {
// If we can't read the tasks, just give up trying to filter the batches
// The database may contain phantom batches, but that's not that big of a deal
faulty = true;
break;
};
task_uids.insert(task.uid);
}
match self.batches.as_mut() {
Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> {
let batch = serde_json::from_str(&line?)?;
Ok(batch)
})),
Some(batches) => Box::new(
(batches)
.lines()
.map(|line| -> Result<_> {
let batch: meilisearch_types::batches::Batch =
serde_json::from_str(&line?)?;
Ok(batch)
})
.filter(move |batch| match batch {
Ok(batch) => {
faulty
|| batch.stats.status.values().any(|t| task_uids.contains(*t))
|| batch.stats.types.values().any(|t| task_uids.contains(*t))
|| batch.stats.index_uids.values().any(|t| task_uids.contains(*t))
}
Err(_) => true,
}),
),
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Batch>> + '_>,
}
}

View File

@@ -727,9 +727,7 @@ impl IndexScheduler {
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel)
{
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks) {
self.scheduler.must_stop_processing.must_stop();
}
}

View File

@@ -81,8 +81,12 @@ make_enum_progress! {
make_enum_progress! {
pub enum TaskDeletionProgress {
RetrievingTasks,
RetrievingBatchTasks,
DeletingTasksDateTime,
DeletingBatchesDateTime,
DeletingTasksMetadata,
DeletingBatchesMetadata,
DeletingTasks,
DeletingBatches,
}

View File

@@ -66,7 +66,7 @@ impl BatchQueue {
NUMBER_OF_DATABASES
}
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,
@@ -127,7 +127,12 @@ impl BatchQueue {
status: Status,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.status.put(wtxn, &status, bitmap)?)
if bitmap.is_empty() {
self.status.delete(wtxn, &status)?;
} else {
self.status.put(wtxn, &status, bitmap)?;
}
Ok(())
}
pub(crate) fn update_status(

View File

@@ -32,7 +32,7 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
/// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1;
/// Database const names for the `IndexScheduler`.
mod db_name {
pub(crate) mod db_name {
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
}

View File

@@ -383,7 +383,11 @@ impl Queue {
// tasks that are not processing. The non-processing ones are filtered normally while the processing ones
// are entirely removed unless the in-memory startedAt variable falls within the date filter.
// Once we have filtered the two subsets, we put them back together and assign it back to `tasks`.
tasks = {
tasks = 'started_at: {
if after_started_at.is_none() && before_started_at.is_none() {
break 'started_at tasks;
}
let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) =
(&tasks - &**processing_tasks, &tasks & &**processing_tasks);

View File

@@ -2,14 +2,14 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use milli::update::Settings as MilliSettings;
use roaring::RoaringBitmap;
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
use super::create_batch::Batch;
@@ -18,11 +18,8 @@ use crate::processing::{
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
};
use crate::utils::{
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
ProcessingBatch,
};
use crate::{Error, IndexScheduler, Result, TaskId};
use crate::utils::{consecutive_ranges, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
@@ -102,10 +99,7 @@ impl IndexScheduler {
}
}
let mut wtxn = self.env.write_txn()?;
let mut deleted_tasks =
self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?;
wtxn.commit()?;
let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?;
for task in tasks.iter_mut() {
task.status = Status::Succeeded;
@@ -127,6 +121,12 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
debug_assert!(
deleted_tasks.is_empty(),
"There should be no tasks left to delete after processing the batch"
);
Ok((tasks, ProcessBatchInfo::default()))
}
Batch::SnapshotCreation(tasks) => self
@@ -567,102 +567,339 @@ impl IndexScheduler {
/// Delete each given task from all the databases (if it is deleteable).
///
/// Return the number of tasks that were actually deleted.
#[allow(clippy::reversed_empty_ranges)]
fn delete_matched_tasks(
&self,
wtxn: &mut RwTxn,
matched_tasks: &RoaringBitmap,
progress: &Progress,
) -> Result<RoaringBitmap> {
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
fn remove_task_datetimes(
wtxn: &mut RwTxn<'_>,
mut to_remove: HashMap<i128, RoaringBitmap>,
db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> {
if to_remove.is_empty() {
return Ok(());
}
let min = to_remove.keys().min().cloned().unwrap(); // to_remove isn't empty so this is ok
let max = to_remove.keys().max().cloned().unwrap();
let range = min..=max;
// We iterate over the time database to see which ranges of timestamps need to be removed
let lazy_db = db.lazily_decode_data();
let iter = lazy_db.range(wtxn, &range)?;
let mut delete_range_start = None;
let mut delete_ranges = Vec::new();
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
for i in iter {
let (timestamp, data) = i?;
if let Some(to_remove) = to_remove.remove(&timestamp) {
let mut current =
data.decode().map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))?;
current -= &to_remove;
if current.is_empty() {
if delete_range_start.is_none() {
delete_range_start = Some(timestamp);
}
} else {
// We could close the deletion range but it's not necessary because the new value will get reinserted anyway
to_put.insert(timestamp, current);
}
} else if let Some(delete_range_start) = delete_range_start.take() {
// Current one must not be deleted so we need to skip it
delete_ranges.push(delete_range_start..timestamp);
}
}
if let Some(delete_range_start) = delete_range_start.take() {
delete_ranges.push(delete_range_start..(max + 1));
}
for range in delete_ranges {
db.delete_range(wtxn, &range)?;
}
for (timestamp, data) in to_put {
db.put(wtxn, &timestamp, &data)?;
}
Ok(())
}
fn remove_batch_datetimes(
wtxn: &mut RwTxn<'_>,
to_remove: &RoaringBitmap,
db: Database<BEI128, CboRoaringBitmapCodec>,
) -> Result<()> {
if to_remove.is_empty() {
return Ok(());
}
// We iterate over the time database to see which ranges of timestamps need to be removed
let iter = db.iter(wtxn)?;
let mut delete_range_start = None;
let mut delete_ranges = Vec::new();
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
for i in iter {
let (timestamp, mut current) = i?;
if !current.is_disjoint(to_remove) {
current -= to_remove;
if current.is_empty() {
if delete_range_start.is_none() {
delete_range_start = Some(timestamp);
}
} else {
// We could close the deletion range but it's not necessary because the new value will get reinserted anyway
to_put.insert(timestamp, current);
}
} else if let Some(delete_range_start) = delete_range_start.take() {
// Current one must not be deleted so we need to skip it
delete_ranges.push(delete_range_start..timestamp);
}
}
if let Some(delete_range_start) = delete_range_start.take() {
delete_ranges.push(delete_range_start..i128::MAX);
}
for range in delete_ranges {
db.delete_range(wtxn, &range)?;
}
for (timestamp, data) in to_put {
db.put(wtxn, &timestamp, &data)?;
}
Ok(())
}
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
let rtxn = self.env.read_txn()?;
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.queue.tasks.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
let all_task_ids = self.queue.tasks.all_task_ids(wtxn)?;
let mut to_delete_tasks = all_task_ids & matched_tasks;
let mut status_tasks = HashMap::new();
for status in enum_iterator::all::<Status>() {
status_tasks.insert(status, self.queue.tasks.get_status(&rtxn, status)?);
}
let enqueued_tasks = status_tasks.get(&Status::Enqueued).unwrap(); // We added all statuses
let all_task_ids = status_tasks.values().union();
let mut to_remove_from_statuses = HashMap::new();
let mut to_delete_tasks = all_task_ids.clone() & matched_tasks;
to_delete_tasks -= &**processing_tasks;
to_delete_tasks -= &enqueued_tasks;
to_delete_tasks -= enqueued_tasks;
// 2. We now have a list of tasks to delete, delete them
// 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated.
let mut affected_indexes = HashSet::new();
let mut affected_statuses = HashSet::new();
let mut affected_kinds = HashSet::new();
let mut affected_canceled_by = RoaringBitmap::new();
// The tasks that have been removed *per batches*.
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); // The tasks that have been removed *per batches*.
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
progress.update_progress(task_progress);
for task_id in to_delete_tasks.iter() {
let task =
self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
for range in consecutive_ranges(to_delete_tasks.iter()) {
let iter = self.queue.tasks.all_tasks.range(&rtxn, &range)?;
for task in iter {
let (task_id, task) = task?;
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
affected_statuses.insert(task.status);
affected_kinds.insert(task.kind.as_kind());
// Note: don't delete the persisted task data since
// we can only delete succeeded, failed, and canceled tasks.
// In each of those cases, the persisted data is supposed to
// have been deleted already.
utils::remove_task_datetime(
wtxn,
self.queue.tasks.enqueued_at,
task.enqueued_at,
task.uid,
)?;
if let Some(started_at) = task.started_at {
utils::remove_task_datetime(
wtxn,
self.queue.tasks.started_at,
started_at,
task.uid,
)?;
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
affected_statuses.insert(task.status);
affected_kinds.insert(task.kind.as_kind());
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
if let Some(started_at) = task.started_at {
tasks_started_to_remove
.entry(started_at.unix_timestamp_nanos())
.or_default()
.insert(task_id);
}
if let Some(finished_at) = task.finished_at {
tasks_finished_to_remove
.entry(finished_at.unix_timestamp_nanos())
.or_default()
.insert(task_id);
}
if let Some(canceled_by) = task.canceled_by {
affected_canceled_by.insert(canceled_by);
}
if let Some(batch_uid) = task.batch_uid {
affected_batches.entry(batch_uid).or_default().insert(task_id);
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
if let Some(finished_at) = task.finished_at {
utils::remove_task_datetime(
wtxn,
self.queue.tasks.finished_at,
finished_at,
task.uid,
)?;
}
if let Some(canceled_by) = task.canceled_by {
affected_canceled_by.insert(canceled_by);
}
if let Some(batch_uid) = task.batch_uid {
affected_batches.entry(batch_uid).or_default().insert(task_id);
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
// 3. Read the tasks by indexes, statuses and kinds
let mut affected_indexes_tasks = HashMap::new();
for index in &affected_indexes {
affected_indexes_tasks
.insert(index.as_str(), self.queue.tasks.index_tasks(&rtxn, index)?);
}
let mut to_remove_from_indexes = HashMap::new();
let mut affected_kinds_tasks = HashMap::new();
for kind in &affected_kinds {
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?);
}
let mut to_remove_from_kinds = HashMap::new();
// 4. Read affected batches' tasks
let mut to_delete_batches = RoaringBitmap::new();
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks);
let (atomic_progress, task_progress) =
AtomicBatchStep::new(affected_batches_bitmap.len() as u32);
progress.update_progress(task_progress);
for range in consecutive_ranges(affected_batches_bitmap.iter()) {
let iter = self.queue.batch_to_tasks_mapping.range(&rtxn, &range)?;
for batch in iter {
let (batch_id, mut tasks) = batch?;
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
tasks -= &to_delete_tasks;
// Note: we never delete tasks from the mapping. It's error-prone but intentional (perf)
// We make sure to filter the tasks from the mapping when we read them.
tasks &= &all_task_ids;
// We must remove the batch entirely
if tasks.is_empty() {
to_delete_batches.insert(batch_id);
}
// We must remove the batch from all the reverse indexes it no longer has tasks for.
for (index, index_tasks) in affected_indexes_tasks.iter() {
if index_tasks.is_disjoint(&tasks) {
to_remove_from_indexes
.entry(index)
.or_insert_with(RoaringBitmap::new)
.insert(batch_id);
}
}
for (status, status_tasks) in status_tasks.iter() {
if status_tasks.is_disjoint(&tasks) {
to_remove_from_statuses
.entry(*status)
.or_insert_with(RoaringBitmap::new)
.insert(batch_id);
}
}
for (kind, kind_tasks) in affected_kinds_tasks.iter() {
if kind_tasks.is_disjoint(&tasks) {
to_remove_from_kinds
.entry(*kind)
.or_insert_with(RoaringBitmap::new)
.insert(batch_id);
}
}
// Note: no need to delete the persisted task data since
// we can only delete succeeded, failed, and canceled tasks.
// In each of those cases, the persisted data is supposed to
// have been deleted already.
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
}
drop(rtxn);
let mut owned_wtxn = self.env.write_txn()?;
let wtxn = &mut owned_wtxn;
// 7. Remove task datetimes
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
remove_task_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?;
remove_task_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?;
remove_task_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?;
// 8. Delete batches datetimes
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?;
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?;
// 9. Remove batches metadata from indexes, statuses, and kinds
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
for (index, batches) in to_remove_from_indexes {
self.queue.batches.update_index(wtxn, index, |b| {
*b -= &batches;
})?;
}
for (status, batches) in to_remove_from_statuses {
self.queue.batches.update_status(wtxn, status, |b| {
*b -= &batches;
})?;
}
for (kind, batches) in to_remove_from_kinds {
self.queue.batches.update_kind(wtxn, kind, |b| {
*b -= &batches;
})?;
}
// 10. Remove tasks from indexes, statuses, and kinds
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
let (atomic_progress, task_progress) = AtomicTaskStep::new(
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
);
progress.update_progress(task_progress);
for index in affected_indexes.iter() {
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
for (index, mut tasks) in affected_indexes_tasks.into_iter() {
tasks -= &to_delete_tasks;
if tasks.is_empty() {
self.queue.tasks.index_tasks.delete(wtxn, index)?;
} else {
self.queue.tasks.index_tasks.put(wtxn, index, &tasks)?;
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for status in affected_statuses.iter() {
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
for status in affected_statuses.into_iter() {
let mut tasks = status_tasks.remove(&status).unwrap(); // we inserted all statuses above
tasks -= &to_delete_tasks;
if tasks.is_empty() {
self.queue.tasks.status.delete(wtxn, &status)?;
} else {
self.queue.tasks.status.put(wtxn, &status, &tasks)?;
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
for kind in affected_kinds.iter() {
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
for (kind, mut tasks) in affected_kinds_tasks.into_iter() {
tasks -= &to_delete_tasks;
if tasks.is_empty() {
self.queue.tasks.kind.delete(wtxn, &kind)?;
} else {
self.queue.tasks.kind.put(wtxn, &kind, &tasks)?;
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
// 11. Delete tasks
progress.update_progress(TaskDeletionProgress::DeletingTasks);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
let (atomic_progress, task_progress) =
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
progress.update_progress(task_progress);
for task in to_delete_tasks.iter() {
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
atomic_progress.fetch_add(1, Ordering::Relaxed);
for range in consecutive_ranges(to_delete_tasks.iter()) {
self.queue.tasks.all_tasks.delete_range(wtxn, &range)?;
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
}
for canceled_by in affected_canceled_by {
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
tasks -= &to_delete_tasks;
@@ -672,96 +909,21 @@ impl IndexScheduler {
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
}
}
}
progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
progress.update_progress(batch_progress);
for (batch_id, to_delete_tasks) in affected_batches {
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
tasks -= &to_delete_tasks;
// We must remove the batch entirely
if tasks.is_empty() {
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
remove_task_datetime(
wtxn,
self.queue.batches.enqueued_at,
earliest,
batch_id,
)?;
remove_task_datetime(
wtxn,
self.queue.batches.enqueued_at,
oldest,
batch_id,
)?;
} else {
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
// and we still need to find the date by scrolling the database
remove_n_tasks_datetime_earlier_than(
wtxn,
self.queue.batches.enqueued_at,
batch.started_at,
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
batch_id,
)?;
}
remove_task_datetime(
wtxn,
self.queue.batches.started_at,
batch.started_at,
batch_id,
)?;
if let Some(finished_at) = batch.finished_at {
remove_task_datetime(
wtxn,
self.queue.batches.finished_at,
finished_at,
batch_id,
)?;
}
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
}
}
// Anyway, we must remove the batch from all its reverse indexes.
// The only way to do that is to check
for index in affected_indexes.iter() {
let index_tasks = self.queue.tasks.index_tasks(wtxn, index)?;
let remaining_index_tasks = index_tasks & &tasks;
if remaining_index_tasks.is_empty() {
self.queue.batches.update_index(wtxn, index, |bitmap| {
bitmap.remove(batch_id);
})?;
}
}
for status in affected_statuses.iter() {
let status_tasks = self.queue.tasks.get_status(wtxn, *status)?;
let remaining_status_tasks = status_tasks & &tasks;
if remaining_status_tasks.is_empty() {
self.queue.batches.update_status(wtxn, *status, |bitmap| {
bitmap.remove(batch_id);
})?;
}
}
for kind in affected_kinds.iter() {
let kind_tasks = self.queue.tasks.get_kind(wtxn, *kind)?;
let remaining_kind_tasks = kind_tasks & &tasks;
if remaining_kind_tasks.is_empty() {
self.queue.batches.update_kind(wtxn, *kind, |bitmap| {
bitmap.remove(batch_id);
})?;
}
}
}
atomic_progress.fetch_add(1, Ordering::Relaxed);
}
// 12. Delete batches
progress.update_progress(TaskDeletionProgress::DeletingBatches);
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
progress.update_progress(task_progress);
for range in consecutive_ranges(to_delete_batches.iter()) {
self.queue.batches.all_batches.delete_range(wtxn, &range)?;
self.queue.batch_to_tasks_mapping.delete_range(wtxn, &range)?;
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
}
owned_wtxn.commit()?;
Ok(to_delete_tasks)
}

View File

@@ -83,12 +83,6 @@ impl IndexScheduler {
t.finished_at = Some(finished_at);
}
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
// This prevent from referencing *future* batches not actually associated with the task.
//
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
t.batch_uid = None;
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.

View File

@@ -20,7 +20,6 @@ failed [3,]
### Kind:
"indexCreation" [1,2,3,4,]
"taskDeletion" [5,]
"upgradeDatabase" []
----------------------------------------------------------------------
### Index Tasks:
catto [1,]

View File

@@ -50,7 +50,6 @@ doggo [2,3,]
----------------------------------------------------------------------
### Batches Status:
succeeded [0,]
failed []
----------------------------------------------------------------------
### Batches Kind:
"upgradeDatabase" [0,]

View File

@@ -3,6 +3,7 @@ use meili_snap::snapshot;
use meilisearch_types::milli::obkv_to_json;
use meilisearch_types::milli::update::IndexDocumentsMethod::*;
use meilisearch_types::tasks::KindWithContent;
use roaring::RoaringBitmap;
use crate::insta_snapshot::snapshot_index_scheduler;
use crate::test_utils::read_json;
@@ -1148,3 +1149,82 @@ fn test_document_addition_with_set_and_null_primary_key_inference_works() {
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn test_task_deletion_issue_5827() {
// 1. We're going to autobatch 2 document addition
// 2. We will delete the first task
// 3. We will delete the second task
// 4. The batch should be gone
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
let mut tasks = Vec::new();
for i in 0..2 {
let content = format!(
r#"{{
"id": {},
"doggo": "bob {}"
}}"#,
i, i
);
let (uuid, mut file) = index_scheduler.queue.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
let task = index_scheduler
.register(
KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
},
None,
false,
)
.unwrap();
tasks.push(task);
index_scheduler.assert_internally_consistent();
}
handle.advance_one_successful_batch();
let rtxn = index_scheduler.read_txn().unwrap();
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[0]);
index_scheduler
.register(
KindWithContent::TaskDeletion {
query: String::from("whatever"),
tasks: RoaringBitmap::from_iter([tasks[0].uid]),
},
None,
false,
)
.unwrap();
handle.advance_one_successful_batch();
let rtxn = index_scheduler.read_txn().unwrap();
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[0, 1]);
index_scheduler
.register(
KindWithContent::TaskDeletion {
query: String::from("whatever"),
tasks: RoaringBitmap::from_iter([tasks[1].uid]),
},
None,
false,
)
.unwrap();
handle.advance_one_successful_batch();
let rtxn = index_scheduler.read_txn().unwrap();
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[1, 2]);
let batch0 = index_scheduler.queue.batches.get_batch(&rtxn, 0).unwrap();
assert!(batch0.is_none(), "Batch 0 should have been deleted");
}

View File

@@ -7,6 +7,9 @@ use tracing::info;
use crate::queue::TaskQueue;
use crate::versioning::Versioning;
use v1_18::V1_17_To_V1_18_0;
mod v1_18;
trait UpgradeIndexScheduler {
fn upgrade(
@@ -29,7 +32,8 @@ pub fn upgrade_index_scheduler(
let current_patch = to.2;
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[
// This is the last upgrade function, it will be called when the index is up to date.
&V1_17_To_V1_18_0 {},
// This is the last upgrade function, it will be called when the scheduler is up to date.
// any other upgrade function should be added before this one.
&ToCurrentNoOp {},
];
@@ -40,6 +44,7 @@ pub fn upgrade_index_scheduler(
(1, 14, _) => 0,
(1, 15, _) => 0,
(1, 16, _) => 0,
(1, 17, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)

View File

@@ -0,0 +1,62 @@
use meilisearch_types::{
heed::Database,
milli::{CboRoaringBitmapCodec, BEU32},
};
use tracing::info;
use super::UpgradeIndexScheduler;
use crate::queue::{db_name::BATCH_TO_TASKS_MAPPING, BatchQueue};
#[allow(non_camel_case_types)]
pub(super) struct V1_17_To_V1_18_0();
impl UpgradeIndexScheduler for V1_17_To_V1_18_0 {
fn upgrade(
&self,
env: &meilisearch_types::heed::Env<meilisearch_types::heed::WithoutTls>,
wtxn: &mut meilisearch_types::heed::RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {
let batch_queue = BatchQueue::new(env, wtxn)?;
let all_batch_ids = batch_queue.all_batch_ids(wtxn)?;
let batch_to_tasks_mapping: Database<BEU32, CboRoaringBitmapCodec> =
env.create_database(wtxn, Some(BATCH_TO_TASKS_MAPPING))?;
let all_batches = batch_queue.all_batches.lazily_decode_data();
let iter = all_batches.iter(wtxn)?;
let mut range_start = None;
let mut count = 0;
let mut ranges = Vec::new();
for batch in iter {
let (batch_id, _) = batch?;
if !all_batch_ids.contains(batch_id) {
count += 1;
if range_start.is_none() {
range_start = Some(batch_id);
}
} else if let Some(start) = range_start.take() {
ranges.push(start..batch_id);
}
}
if let Some(start) = range_start {
ranges.push(start..u32::MAX);
}
if !ranges.is_empty() {
info!("Removing {count} batches that were not properly removed in previous versions due to #5827.");
}
for range in ranges {
batch_queue.all_batches.delete_range(wtxn, &range)?;
batch_to_tasks_mapping.delete_range(wtxn, &range)?;
}
Ok(())
}
fn target_version(&self) -> (u32, u32, u32) {
(1, 18, 0)
}
}

View File

@@ -2,7 +2,7 @@
use crate::milli::progress::EmbedderStats;
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use std::ops::{Bound, RangeInclusive};
use std::sync::Arc;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
@@ -159,6 +159,30 @@ impl ProcessingBatch {
}
}
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
pub(crate) fn consecutive_ranges(
iter: impl IntoIterator<Item = u32>,
) -> impl Iterator<Item = RangeInclusive<u32>> {
let mut iter = iter.into_iter().peekable();
std::iter::from_fn(move || {
let start = iter.next()?;
let mut end = start;
while let Some(&next) = iter.peek() {
if next == end + 1 {
end = next;
iter.next();
} else {
break;
}
}
Some(start..=end)
})
}
pub(crate) fn insert_task_datetime(
wtxn: &mut RwTxn,
database: Database<BEI128, CboRoaringBitmapCodec>,
@@ -168,7 +192,7 @@ pub(crate) fn insert_task_datetime(
let timestamp = time.unix_timestamp_nanos();
let mut task_ids = database.get(wtxn, &timestamp)?.unwrap_or_default();
task_ids.insert(task_id);
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(task_ids))?;
database.put(wtxn, &timestamp, &task_ids)?;
Ok(())
}
@@ -184,7 +208,7 @@ pub(crate) fn remove_task_datetime(
if existing.is_empty() {
database.delete(wtxn, &timestamp)?;
} else {
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(existing))?;
database.put(wtxn, &timestamp, &existing)?;
}
}

View File

@@ -47,7 +47,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
tag = "Export",
security(("Bearer" = ["export", "*"])),
responses(
(status = 202, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = OK, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 1,
"status": "enqueued",
@@ -106,6 +106,7 @@ async fn export(
analytics.publish(analytics_aggregate, &req);
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
Ok(HttpResponse::Ok().json(task))
}

View File

@@ -310,7 +310,7 @@ impl Aggregate for DocumentsDeletionAggregator {
("documentId" = String, Path, example = "853", description = "Document Identifier", nullable = false),
),
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -427,7 +427,7 @@ pub struct BrowseQuery {
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
request_body = BrowseQuery,
responses(
(status = 200, description = "Task successfully enqueued", body = PaginationView<serde_json::Value>, content_type = "application/json", example = json!(
(status = 200, description = "Documents returned", body = PaginationView<serde_json::Value>, content_type = "application/json", example = json!(
{
"results":[
{
@@ -745,7 +745,7 @@ impl<Method: AggregateMethod> Aggregate for DocumentsAggregator<Method> {
),
request_body = serde_json::Value,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -846,7 +846,7 @@ pub async fn replace_documents(
),
request_body = serde_json::Value,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -1112,7 +1112,7 @@ async fn copy_body_to_file(
),
request_body = Vec<Value>,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -1303,7 +1303,7 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
),
request_body = DocumentEditionByFunction,
responses(
(status = 202, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -1401,7 +1401,7 @@ pub async fn edit_documents_by_function(
security(("Bearer" = ["documents.delete", "documents.*", "*"])),
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,

View File

@@ -231,7 +231,7 @@ impl Aggregate for IndexCreatedAggregate {
security(("Bearer" = ["indexes.create", "indexes.*", "*"])),
request_body = IndexCreateRequest,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": "movies",

View File

@@ -98,7 +98,7 @@ macro_rules! make_setting_route {
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
request_body = $type,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": "movies",
@@ -162,7 +162,7 @@ macro_rules! make_setting_route {
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
request_body = $type,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": "movies",
@@ -530,7 +530,7 @@ make_setting_routes!(
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
request_body = Settings<Unchecked>,
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": "movies",
@@ -680,7 +680,7 @@ pub async fn get_all(
security(("Bearer" = ["settings.update", "settings.*", "*"])),
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": "movies",

View File

@@ -73,7 +73,7 @@ impl Aggregate for IndexSwappedAnalytics {
security(("Bearer" = ["search", "*"])),
request_body = Vec<SwapIndexesPayload>,
responses(
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 3,
"indexUid": null,

View File

@@ -311,7 +311,7 @@ impl<Method: AggregateMethod + 'static> Aggregate for TaskFilterAnalytics<Method
security(("Bearer" = ["tasks.cancel", "tasks.*", "*"])),
params(TaskDeletionOrCancelationQuery),
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -392,6 +392,7 @@ async fn cancel_tasks(
.await??;
let task: SummarizedTaskView = task.into();
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
Ok(HttpResponse::Ok().json(task))
}
@@ -405,7 +406,7 @@ async fn cancel_tasks(
security(("Bearer" = ["tasks.delete", "tasks.*", "*"])),
params(TaskDeletionOrCancelationQuery),
responses(
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
{
"taskUid": 147,
"indexUid": null,
@@ -485,6 +486,7 @@ async fn delete_tasks(
.await??;
let task: SummarizedTaskView = task.into();
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
Ok(HttpResponse::Ok().json(task))
}

View File

@@ -1274,7 +1274,7 @@ async fn test_summarized_batch_cancelation() {
#[actix_web::test]
async fn test_summarized_batch_deletion() {
let server = Server::new_shared();
let server = Server::new().await;
let index = server.unique_index();
// to avoid being flaky we're only going to delete an already finished batch :(
let (task, _status_code) = index.create(None).await;
@@ -1293,7 +1293,7 @@ async fn test_summarized_batch_deletion() {
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".details.originalFilter" => "?uids=X"
},
@r###"
@r#"
{
"uid": "[uid]",
"progress": null,
@@ -1318,7 +1318,7 @@ async fn test_summarized_batch_deletion() {
"finishedAt": "[date]",
"batchStrategy": "stopped after the last task of type `taskDeletion` because they cannot be batched with tasks of any other type."
}
"###);
"#);
}
#[actix_web::test]

View File

@@ -154,6 +154,24 @@ impl From<Vec<Value>> for Value {
}
}
pub trait IntoTaskUid {
fn uid(&self) -> u64;
}
impl IntoTaskUid for Value {
fn uid(&self) -> u64 {
self["taskUid"].as_u64().unwrap_or_else(|| {
panic!("Called `uid` on a Value that doesn't contain a taskUid: {self}")
})
}
}
impl IntoTaskUid for u64 {
fn uid(&self) -> u64 {
*self
}
}
#[macro_export]
macro_rules! json {
($($json:tt)+) => {

View File

@@ -426,7 +426,9 @@ impl<State> Server<State> {
self.service.delete(format!("/tasks?{}", value)).await
}
pub async fn wait_task(&self, update_id: u64) -> Value {
pub async fn wait_task(&self, update_id: impl super::IntoTaskUid) -> Value {
let update_id = update_id.uid();
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{update_id}");
let max_attempts = 400; // 200 seconds in total, 0.5secs per attempt

View File

@@ -0,0 +1,247 @@
use crate::common::Server;
use crate::json;
use crate::tasks::OffsetDateTime;
use meili_snap::{json_string, snapshot};
use time::format_description::well_known::Rfc3339;
use urlencoding::encode;
#[actix_rt::test]
async fn delete_task() {
let server = Server::new().await;
let index = server.unique_index();
// Add a document
let (task, code) = index
.add_documents(json!([{"id": 1, "free": "palestine", "asc_desc_rank": 1}]), Some("id"))
.await;
snapshot!(code, @r#"202 Accepted"#);
let task_uid = task["taskUid"].as_u64().unwrap();
server.wait_task(task).await.succeeded();
// Delete tasks
let (task, code) = server.delete_tasks(&format!("uids={task_uid}")).await;
snapshot!(code, @"200 OK");
let value = server.wait_task(task).await.succeeded();
snapshot!(value, @r#"
{
"uid": "[uid]",
"batchUid": "[batch_uid]",
"indexUid": null,
"status": "succeeded",
"type": "taskDeletion",
"canceledBy": null,
"details": {
"matchedTasks": 1,
"deletedTasks": 1,
"originalFilter": "?uids=0"
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
// Check that the task is deleted
let (value, code) = index.list_tasks().await;
snapshot!(code, @r#"200 OK"#);
snapshot!(value, @r#"
{
"results": [],
"total": 0,
"limit": 20,
"from": null,
"next": null
}
"#);
}
async fn delete_tasks_time_bounds_inner(name: &str) {
let server = Server::new().await;
let index = server.unique_index();
// Add documents
for i in 0..2 {
let (task, code) =
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
snapshot!(code, @r#"202 Accepted"#);
server.wait_task(task).await.succeeded();
}
let time1 = OffsetDateTime::now_utc();
for i in 2..4 {
let (task, code) =
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
snapshot!(code, @r#"202 Accepted"#);
server.wait_task(task).await.succeeded();
}
let time2 = OffsetDateTime::now_utc();
for i in 4..6 {
let (task, code) =
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
snapshot!(code, @r#"202 Accepted"#);
server.wait_task(task).await.succeeded();
}
// Delete tasks with before_enqueued and after_enqueued
let (task, code) = server
.delete_tasks(&format!(
"before{name}={}&after{name}={}",
encode(&time2.format(&Rfc3339).unwrap()),
encode(&time1.format(&Rfc3339).unwrap()),
))
.await;
snapshot!(code, @"200 OK");
let value = server.wait_task(task).await.succeeded();
snapshot!(json_string!(value, {
".details.originalFilter" => "[ignored]",
".duration" => "[duration]",
".enqueuedAt" => "[date]",
".startedAt" => "[date]",
".finishedAt" => "[date]"
}), @r#"
{
"uid": 6,
"batchUid": 6,
"indexUid": null,
"status": "succeeded",
"type": "taskDeletion",
"canceledBy": null,
"details": {
"matchedTasks": 2,
"deletedTasks": 2,
"originalFilter": "[ignored]"
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"#);
// Check that the task is deleted
let (value, code) = server.tasks().await;
snapshot!(code, @r#"200 OK"#);
snapshot!(json_string!(value, {
".results[].duration" => "[duration]",
".results[].enqueuedAt" => "[date]",
".results[].startedAt" => "[date]",
".results[].finishedAt" => "[date]",
".results[].details.originalFilter" => "[ignored]"
}), @r#"
{
"results": [
{
"uid": 6,
"batchUid": 6,
"indexUid": null,
"status": "succeeded",
"type": "taskDeletion",
"canceledBy": null,
"details": {
"matchedTasks": 2,
"deletedTasks": 2,
"originalFilter": "[ignored]"
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 5,
"batchUid": 5,
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 4,
"batchUid": 4,
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 1,
"batchUid": 1,
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
},
{
"uid": 0,
"batchUid": 0,
"indexUid": "[uuid]",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
],
"total": 5,
"limit": 20,
"from": 6,
"next": null
}
"#);
}
#[actix_rt::test]
async fn delete_tasks_enqueued() {
delete_tasks_time_bounds_inner("EnqueuedAt").await;
}
#[actix_rt::test]
async fn delete_tasks_started() {
delete_tasks_time_bounds_inner("StartedAt").await;
}
#[actix_rt::test]
async fn delete_tasks_finished() {
delete_tasks_time_bounds_inner("FinishedAt").await;
}

View File

@@ -1,3 +1,4 @@
mod deletion;
mod errors;
mod webhook;

View File

@@ -3,16 +3,17 @@ mod v1_13;
mod v1_14;
mod v1_15;
mod v1_16;
mod v1_17;
use heed::RwTxn;
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13};
use v1_14::Latest_V1_13_To_Latest_V1_14;
use v1_15::Latest_V1_14_To_Latest_V1_15;
use v1_16::Latest_V1_16_To_V1_17_0;
use v1_16::Latest_V1_15_To_V1_16_0;
use v1_17::Latest_V1_16_To_V1_17_0;
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::{Progress, VariableNameStep};
use crate::update::upgrade::v1_16::Latest_V1_15_To_V1_16_0;
use crate::{Index, InternalError, Result};
trait UpgradeIndex {

View File

@@ -46,22 +46,3 @@ impl UpgradeIndex for Latest_V1_15_To_V1_16_0 {
(1, 16, 0)
}
}
#[allow(non_camel_case_types)]
pub(super) struct Latest_V1_16_To_V1_17_0();
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
fn upgrade(
&self,
_wtxn: &mut RwTxn,
_index: &Index,
_original: (u32, u32, u32),
_progress: Progress,
) -> Result<bool> {
Ok(false)
}
fn target_version(&self) -> (u32, u32, u32) {
(1, 17, 0)
}
}

View File

@@ -0,0 +1,24 @@
use heed::RwTxn;
use super::UpgradeIndex;
use crate::progress::Progress;
use crate::{Index, Result};
#[allow(non_camel_case_types)]
pub(super) struct Latest_V1_16_To_V1_17_0();
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
fn upgrade(
&self,
_wtxn: &mut RwTxn,
_index: &Index,
_original: (u32, u32, u32),
_progress: Progress,
) -> Result<bool> {
Ok(false)
}
fn target_version(&self) -> (u32, u32, u32) {
(1, 17, 0)
}
}