mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 12:46:53 +00:00
Compare commits
58 Commits
v1.24.0
...
tasks-du-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7c2d478ef3 | ||
|
|
0c4cc4a00a | ||
|
|
28ac767f42 | ||
|
|
131823e4c5 | ||
|
|
ae73465eab | ||
|
|
1f7689a3fb | ||
|
|
0b89783a3f | ||
|
|
808b9c71fa | ||
|
|
8990f2a14a | ||
|
|
fe275397ec | ||
|
|
d7776ec82b | ||
|
|
84244a74df | ||
|
|
3b470af54c | ||
|
|
702a265888 | ||
|
|
7eb871d671 | ||
|
|
eec711d93e | ||
|
|
323d958bf6 | ||
|
|
e9007effea | ||
|
|
e0f5e05b22 | ||
|
|
f54a15606e | ||
|
|
fee2fed044 | ||
|
|
72ea3efdd5 | ||
|
|
e1fa79e92a | ||
|
|
915cd1f108 | ||
|
|
e5188699f9 | ||
|
|
1d46cb30f2 | ||
|
|
f5bd405935 | ||
|
|
a156db4f28 | ||
|
|
0e9ac3ca92 | ||
|
|
a54d7d2269 | ||
|
|
60a1188a58 | ||
|
|
62ad7b345a | ||
|
|
16c57414ad | ||
|
|
e23bf92502 | ||
|
|
3fc09107ec | ||
|
|
68ad9d6021 | ||
|
|
8cdf65bbbc | ||
|
|
05c0b6ee6e | ||
|
|
1cb9816f44 | ||
|
|
6f8d788aa8 | ||
|
|
6a99c5b2f3 | ||
|
|
cb36257537 | ||
|
|
ba0f0c3c30 | ||
|
|
39eebac7e5 | ||
|
|
4c61d3a939 | ||
|
|
eb8ff31513 | ||
|
|
418730ef73 | ||
|
|
9a29a7790b | ||
|
|
14cb1bbbfb | ||
|
|
31c4215ad2 | ||
|
|
ff68802ffc | ||
|
|
2d479332d3 | ||
|
|
ea894e6a2c | ||
|
|
4ae43eb51a | ||
|
|
5309a37fb0 | ||
|
|
498b0b1419 | ||
|
|
0e9584672c | ||
|
|
73e82d67d7 |
@@ -264,6 +264,7 @@ pub(crate) mod test {
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::reader::Document;
|
||||
use crate::writer::BatchWriter;
|
||||
use crate::{DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version};
|
||||
|
||||
pub fn create_test_instance_uid() -> Uuid {
|
||||
@@ -468,7 +469,7 @@ pub(crate) mod test {
|
||||
]
|
||||
}
|
||||
|
||||
pub fn create_test_dump() -> File {
|
||||
pub fn create_test_dump_writer() -> (DumpWriter, BatchWriter) {
|
||||
let instance_uid = create_test_instance_uid();
|
||||
let dump = DumpWriter::new(Some(instance_uid)).unwrap();
|
||||
|
||||
@@ -490,7 +491,6 @@ pub(crate) mod test {
|
||||
for batch in &batches {
|
||||
batch_queue.push_batch(batch).unwrap();
|
||||
}
|
||||
batch_queue.flush().unwrap();
|
||||
|
||||
// ========== pushing the task queue
|
||||
let tasks = create_test_tasks();
|
||||
@@ -524,6 +524,13 @@ pub(crate) mod test {
|
||||
let network = create_test_network();
|
||||
dump.create_network(network).unwrap();
|
||||
|
||||
(dump, batch_queue)
|
||||
}
|
||||
|
||||
pub fn create_test_dump() -> File {
|
||||
let (dump, batch_writer) = create_test_dump_writer();
|
||||
batch_writer.flush().unwrap();
|
||||
|
||||
// create the dump
|
||||
let mut file = tempfile::tempfile().unwrap();
|
||||
dump.persist_to(&mut file).unwrap();
|
||||
|
||||
@@ -229,12 +229,56 @@ impl From<CompatIndexV5ToV6> for DumpIndexReader {
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
use std::fs::File;
|
||||
use std::{fs::File, io::Seek};
|
||||
|
||||
use meili_snap::insta;
|
||||
use meilisearch_types::{
|
||||
batches::{Batch, BatchEnqueuedAt, BatchStats},
|
||||
task_view::DetailsView,
|
||||
tasks::{BatchStopReason, Kind, Status},
|
||||
};
|
||||
use time::macros::datetime;
|
||||
|
||||
use super::*;
|
||||
use crate::reader::v6::RuntimeTogglableFeatures;
|
||||
use crate::{reader::v6::RuntimeTogglableFeatures, test::create_test_dump_writer};
|
||||
|
||||
#[test]
|
||||
fn import_dump_with_bad_batches() {
|
||||
let (dump, mut batch_writer) = create_test_dump_writer();
|
||||
let bad_batch = Batch {
|
||||
uid: 1,
|
||||
progress: None,
|
||||
details: DetailsView::default(),
|
||||
stats: BatchStats {
|
||||
total_nb_tasks: 1,
|
||||
status: maplit::btreemap! { Status::Succeeded => 666 },
|
||||
types: maplit::btreemap! { Kind::DocumentAdditionOrUpdate => 666 },
|
||||
index_uids: maplit::btreemap! { "doggo".to_string() => 666 },
|
||||
progress_trace: Default::default(),
|
||||
write_channel_congestion: None,
|
||||
internal_database_sizes: Default::default(),
|
||||
},
|
||||
embedder_stats: Default::default(),
|
||||
enqueued_at: Some(BatchEnqueuedAt {
|
||||
earliest: datetime!(2022-11-11 0:00 UTC),
|
||||
oldest: datetime!(2022-11-11 0:00 UTC),
|
||||
}),
|
||||
started_at: datetime!(2022-11-20 0:00 UTC),
|
||||
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
|
||||
stop_reason: BatchStopReason::Unspecified.to_string(),
|
||||
};
|
||||
batch_writer.push_batch(&bad_batch).unwrap();
|
||||
batch_writer.flush().unwrap();
|
||||
|
||||
let mut file = tempfile::tempfile().unwrap();
|
||||
dump.persist_to(&mut file).unwrap();
|
||||
file.rewind().unwrap();
|
||||
|
||||
let mut dump = DumpReader::open(file).unwrap();
|
||||
let read_batches = dump.batches().unwrap().map(|b| b.unwrap()).collect::<Vec<_>>();
|
||||
|
||||
assert!(!read_batches.iter().any(|b| b.uid == 1));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn import_dump_v6_with_vectors() {
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::path::Path;
|
||||
|
||||
pub use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::vector::hf::OverridePooling;
|
||||
use roaring::RoaringBitmap;
|
||||
use tempfile::TempDir;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::debug;
|
||||
@@ -56,6 +57,7 @@ pub struct V6Reader {
|
||||
instance_uid: Option<Uuid>,
|
||||
metadata: Metadata,
|
||||
tasks: BufReader<File>,
|
||||
tasks2: BufReader<File>,
|
||||
batches: Option<BufReader<File>>,
|
||||
keys: BufReader<File>,
|
||||
features: Option<RuntimeTogglableFeatures>,
|
||||
@@ -122,6 +124,7 @@ impl V6Reader {
|
||||
metadata: serde_json::from_reader(&*meta_file)?,
|
||||
instance_uid,
|
||||
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
|
||||
tasks2: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
|
||||
batches,
|
||||
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
|
||||
features,
|
||||
@@ -187,12 +190,48 @@ impl V6Reader {
|
||||
}))
|
||||
}
|
||||
|
||||
fn tasks2(&mut self) -> Box<dyn Iterator<Item = Result<Task>> + '_> {
|
||||
Box::new(
|
||||
(&mut self.tasks2)
|
||||
.lines()
|
||||
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
|
||||
)
|
||||
}
|
||||
|
||||
pub fn batches(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
|
||||
// Get batches but filters batches so that those whose tasks have been deleted are not returned
|
||||
// This is due to bug #5827 that caused them not to be deleted before version 1.18
|
||||
|
||||
let mut task_uids = RoaringBitmap::new();
|
||||
let mut faulty = false;
|
||||
for task in self.tasks2() {
|
||||
let Ok(task) = task else {
|
||||
// If we can't read the tasks, just give up trying to filter the batches
|
||||
// The database may contain phantom batches, but that's not that big of a deal
|
||||
faulty = true;
|
||||
break;
|
||||
};
|
||||
task_uids.insert(task.uid);
|
||||
}
|
||||
match self.batches.as_mut() {
|
||||
Some(batches) => Box::new((batches).lines().map(|line| -> Result<_> {
|
||||
let batch = serde_json::from_str(&line?)?;
|
||||
Ok(batch)
|
||||
})),
|
||||
Some(batches) => Box::new(
|
||||
(batches)
|
||||
.lines()
|
||||
.map(|line| -> Result<_> {
|
||||
let batch: meilisearch_types::batches::Batch =
|
||||
serde_json::from_str(&line?)?;
|
||||
Ok(batch)
|
||||
})
|
||||
.filter(move |batch| match batch {
|
||||
Ok(batch) => {
|
||||
faulty
|
||||
|| batch.stats.status.values().any(|t| task_uids.contains(*t))
|
||||
|| batch.stats.types.values().any(|t| task_uids.contains(*t))
|
||||
|| batch.stats.index_uids.values().any(|t| task_uids.contains(*t))
|
||||
}
|
||||
Err(_) => true,
|
||||
}),
|
||||
),
|
||||
None => Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<Batch>> + '_>,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -727,9 +727,7 @@ impl IndexScheduler {
|
||||
// If the registered task is a task cancelation
|
||||
// we inform the processing tasks to stop (if necessary).
|
||||
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
|
||||
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
|
||||
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks_to_cancel)
|
||||
{
|
||||
if self.processing_tasks.read().unwrap().must_cancel_processing_tasks(&tasks) {
|
||||
self.scheduler.must_stop_processing.must_stop();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,8 +81,12 @@ make_enum_progress! {
|
||||
|
||||
make_enum_progress! {
|
||||
pub enum TaskDeletionProgress {
|
||||
RetrievingTasks,
|
||||
RetrievingBatchTasks,
|
||||
DeletingTasksDateTime,
|
||||
DeletingBatchesDateTime,
|
||||
DeletingTasksMetadata,
|
||||
DeletingBatchesMetadata,
|
||||
DeletingTasks,
|
||||
DeletingBatches,
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ impl BatchQueue {
|
||||
NUMBER_OF_DATABASES
|
||||
}
|
||||
|
||||
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
Ok(Self {
|
||||
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
|
||||
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,
|
||||
@@ -127,7 +127,12 @@ impl BatchQueue {
|
||||
status: Status,
|
||||
bitmap: &RoaringBitmap,
|
||||
) -> Result<()> {
|
||||
Ok(self.status.put(wtxn, &status, bitmap)?)
|
||||
if bitmap.is_empty() {
|
||||
self.status.delete(wtxn, &status)?;
|
||||
} else {
|
||||
self.status.put(wtxn, &status, bitmap)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn update_status(
|
||||
|
||||
@@ -32,7 +32,7 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
|
||||
/// The number of database used by queue itself
|
||||
const NUMBER_OF_DATABASES: u32 = 1;
|
||||
/// Database const names for the `IndexScheduler`.
|
||||
mod db_name {
|
||||
pub(crate) mod db_name {
|
||||
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
|
||||
}
|
||||
|
||||
|
||||
@@ -383,7 +383,11 @@ impl Queue {
|
||||
// tasks that are not processing. The non-processing ones are filtered normally while the processing ones
|
||||
// are entirely removed unless the in-memory startedAt variable falls within the date filter.
|
||||
// Once we have filtered the two subsets, we put them back together and assign it back to `tasks`.
|
||||
tasks = {
|
||||
tasks = 'started_at: {
|
||||
if after_started_at.is_none() && before_started_at.is_none() {
|
||||
break 'started_at tasks;
|
||||
}
|
||||
|
||||
let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) =
|
||||
(&tasks - &**processing_tasks, &tasks & &**processing_tasks);
|
||||
|
||||
|
||||
@@ -2,14 +2,14 @@ use std::collections::{BTreeSet, HashMap, HashSet};
|
||||
use std::panic::{catch_unwind, AssertUnwindSafe};
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
|
||||
use meilisearch_types::heed::{RoTxn, RwTxn};
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{self, ChannelCongestion};
|
||||
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, ChannelCongestion};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use milli::update::Settings as MilliSettings;
|
||||
use roaring::RoaringBitmap;
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::create_batch::Batch;
|
||||
@@ -18,11 +18,8 @@ use crate::processing::{
|
||||
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
|
||||
UpdateIndexProgress,
|
||||
};
|
||||
use crate::utils::{
|
||||
self, remove_n_tasks_datetime_earlier_than, remove_task_datetime, swap_index_uid_in_task,
|
||||
ProcessingBatch,
|
||||
};
|
||||
use crate::{Error, IndexScheduler, Result, TaskId};
|
||||
use crate::utils::{consecutive_ranges, swap_index_uid_in_task, ProcessingBatch};
|
||||
use crate::{Error, IndexScheduler, Result, TaskId, BEI128};
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ProcessBatchInfo {
|
||||
@@ -102,10 +99,7 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut deleted_tasks =
|
||||
self.delete_matched_tasks(&mut wtxn, &matched_tasks, &progress)?;
|
||||
wtxn.commit()?;
|
||||
let mut deleted_tasks = self.delete_matched_tasks(&matched_tasks, &progress)?;
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Succeeded;
|
||||
@@ -127,6 +121,12 @@ impl IndexScheduler {
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
debug_assert!(
|
||||
deleted_tasks.is_empty(),
|
||||
"There should be no tasks left to delete after processing the batch"
|
||||
);
|
||||
|
||||
Ok((tasks, ProcessBatchInfo::default()))
|
||||
}
|
||||
Batch::SnapshotCreation(tasks) => self
|
||||
@@ -567,102 +567,339 @@ impl IndexScheduler {
|
||||
/// Delete each given task from all the databases (if it is deleteable).
|
||||
///
|
||||
/// Return the number of tasks that were actually deleted.
|
||||
#[allow(clippy::reversed_empty_ranges)]
|
||||
fn delete_matched_tasks(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
progress: &Progress,
|
||||
) -> Result<RoaringBitmap> {
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||
fn remove_task_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
mut to_remove: HashMap<i128, RoaringBitmap>,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<()> {
|
||||
if to_remove.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let min = to_remove.keys().min().cloned().unwrap(); // to_remove isn't empty so this is ok
|
||||
let max = to_remove.keys().max().cloned().unwrap();
|
||||
let range = min..=max;
|
||||
|
||||
// We iterate over the time database to see which ranges of timestamps need to be removed
|
||||
let lazy_db = db.lazily_decode_data();
|
||||
let iter = lazy_db.range(wtxn, &range)?;
|
||||
let mut delete_range_start = None;
|
||||
let mut delete_ranges = Vec::new();
|
||||
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
for i in iter {
|
||||
let (timestamp, data) = i?;
|
||||
|
||||
if let Some(to_remove) = to_remove.remove(×tamp) {
|
||||
let mut current =
|
||||
data.decode().map_err(|e| Error::Anyhow(anyhow::anyhow!(e)))?;
|
||||
current -= &to_remove;
|
||||
|
||||
if current.is_empty() {
|
||||
if delete_range_start.is_none() {
|
||||
delete_range_start = Some(timestamp);
|
||||
}
|
||||
} else {
|
||||
// We could close the deletion range but it's not necessary because the new value will get reinserted anyway
|
||||
to_put.insert(timestamp, current);
|
||||
}
|
||||
} else if let Some(delete_range_start) = delete_range_start.take() {
|
||||
// Current one must not be deleted so we need to skip it
|
||||
delete_ranges.push(delete_range_start..timestamp);
|
||||
}
|
||||
}
|
||||
if let Some(delete_range_start) = delete_range_start.take() {
|
||||
delete_ranges.push(delete_range_start..(max + 1));
|
||||
}
|
||||
|
||||
for range in delete_ranges {
|
||||
db.delete_range(wtxn, &range)?;
|
||||
}
|
||||
|
||||
for (timestamp, data) in to_put {
|
||||
db.put(wtxn, ×tamp, &data)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove_batch_datetimes(
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
to_remove: &RoaringBitmap,
|
||||
db: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
) -> Result<()> {
|
||||
if to_remove.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We iterate over the time database to see which ranges of timestamps need to be removed
|
||||
let iter = db.iter(wtxn)?;
|
||||
let mut delete_range_start = None;
|
||||
let mut delete_ranges = Vec::new();
|
||||
let mut to_put: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
for i in iter {
|
||||
let (timestamp, mut current) = i?;
|
||||
|
||||
if !current.is_disjoint(to_remove) {
|
||||
current -= to_remove;
|
||||
|
||||
if current.is_empty() {
|
||||
if delete_range_start.is_none() {
|
||||
delete_range_start = Some(timestamp);
|
||||
}
|
||||
} else {
|
||||
// We could close the deletion range but it's not necessary because the new value will get reinserted anyway
|
||||
to_put.insert(timestamp, current);
|
||||
}
|
||||
} else if let Some(delete_range_start) = delete_range_start.take() {
|
||||
// Current one must not be deleted so we need to skip it
|
||||
delete_ranges.push(delete_range_start..timestamp);
|
||||
}
|
||||
}
|
||||
if let Some(delete_range_start) = delete_range_start.take() {
|
||||
delete_ranges.push(delete_range_start..i128::MAX);
|
||||
}
|
||||
|
||||
for range in delete_ranges {
|
||||
db.delete_range(wtxn, &range)?;
|
||||
}
|
||||
|
||||
for (timestamp, data) in to_put {
|
||||
db.put(wtxn, ×tamp, &data)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingTasks);
|
||||
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.queue.tasks.get_status(wtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
|
||||
let all_task_ids = self.queue.tasks.all_task_ids(wtxn)?;
|
||||
let mut to_delete_tasks = all_task_ids & matched_tasks;
|
||||
let mut status_tasks = HashMap::new();
|
||||
for status in enum_iterator::all::<Status>() {
|
||||
status_tasks.insert(status, self.queue.tasks.get_status(&rtxn, status)?);
|
||||
}
|
||||
let enqueued_tasks = status_tasks.get(&Status::Enqueued).unwrap(); // We added all statuses
|
||||
let all_task_ids = status_tasks.values().union();
|
||||
let mut to_remove_from_statuses = HashMap::new();
|
||||
let mut to_delete_tasks = all_task_ids.clone() & matched_tasks;
|
||||
to_delete_tasks -= &**processing_tasks;
|
||||
to_delete_tasks -= &enqueued_tasks;
|
||||
to_delete_tasks -= enqueued_tasks;
|
||||
|
||||
// 2. We now have a list of tasks to delete, delete them
|
||||
// 2. We now have a list of tasks to delete. Read their metadata to list what needs to be updated.
|
||||
let mut affected_indexes = HashSet::new();
|
||||
let mut affected_statuses = HashSet::new();
|
||||
let mut affected_kinds = HashSet::new();
|
||||
let mut affected_canceled_by = RoaringBitmap::new();
|
||||
// The tasks that have been removed *per batches*.
|
||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new();
|
||||
|
||||
let mut affected_batches: HashMap<BatchId, RoaringBitmap> = HashMap::new(); // The tasks that have been removed *per batches*.
|
||||
let mut tasks_enqueued_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut tasks_started_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let mut tasks_finished_to_remove: HashMap<i128, RoaringBitmap> = HashMap::new();
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for task_id in to_delete_tasks.iter() {
|
||||
let task =
|
||||
self.queue.tasks.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||
let iter = self.queue.tasks.all_tasks.range(&rtxn, &range)?;
|
||||
for task in iter {
|
||||
let (task_id, task) = task?;
|
||||
|
||||
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
|
||||
affected_statuses.insert(task.status);
|
||||
affected_kinds.insert(task.kind.as_kind());
|
||||
// Note: don't delete the persisted task data since
|
||||
// we can only delete succeeded, failed, and canceled tasks.
|
||||
// In each of those cases, the persisted data is supposed to
|
||||
// have been deleted already.
|
||||
utils::remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.tasks.enqueued_at,
|
||||
task.enqueued_at,
|
||||
task.uid,
|
||||
)?;
|
||||
if let Some(started_at) = task.started_at {
|
||||
utils::remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.tasks.started_at,
|
||||
started_at,
|
||||
task.uid,
|
||||
)?;
|
||||
affected_indexes.extend(task.indexes().into_iter().map(|x| x.to_owned()));
|
||||
affected_statuses.insert(task.status);
|
||||
affected_kinds.insert(task.kind.as_kind());
|
||||
|
||||
let enqueued_at = task.enqueued_at.unix_timestamp_nanos();
|
||||
tasks_enqueued_to_remove.entry(enqueued_at).or_default().insert(task_id);
|
||||
|
||||
if let Some(started_at) = task.started_at {
|
||||
tasks_started_to_remove
|
||||
.entry(started_at.unix_timestamp_nanos())
|
||||
.or_default()
|
||||
.insert(task_id);
|
||||
}
|
||||
|
||||
if let Some(finished_at) = task.finished_at {
|
||||
tasks_finished_to_remove
|
||||
.entry(finished_at.unix_timestamp_nanos())
|
||||
.or_default()
|
||||
.insert(task_id);
|
||||
}
|
||||
|
||||
if let Some(canceled_by) = task.canceled_by {
|
||||
affected_canceled_by.insert(canceled_by);
|
||||
}
|
||||
if let Some(batch_uid) = task.batch_uid {
|
||||
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
if let Some(finished_at) = task.finished_at {
|
||||
utils::remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.tasks.finished_at,
|
||||
finished_at,
|
||||
task.uid,
|
||||
)?;
|
||||
}
|
||||
if let Some(canceled_by) = task.canceled_by {
|
||||
affected_canceled_by.insert(canceled_by);
|
||||
}
|
||||
if let Some(batch_uid) = task.batch_uid {
|
||||
affected_batches.entry(batch_uid).or_default().insert(task_id);
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 3. Read the tasks by indexes, statuses and kinds
|
||||
let mut affected_indexes_tasks = HashMap::new();
|
||||
for index in &affected_indexes {
|
||||
affected_indexes_tasks
|
||||
.insert(index.as_str(), self.queue.tasks.index_tasks(&rtxn, index)?);
|
||||
}
|
||||
let mut to_remove_from_indexes = HashMap::new();
|
||||
|
||||
let mut affected_kinds_tasks = HashMap::new();
|
||||
for kind in &affected_kinds {
|
||||
affected_kinds_tasks.insert(*kind, self.queue.tasks.get_kind(&rtxn, *kind)?);
|
||||
}
|
||||
let mut to_remove_from_kinds = HashMap::new();
|
||||
|
||||
// 4. Read affected batches' tasks
|
||||
let mut to_delete_batches = RoaringBitmap::new();
|
||||
let affected_batches_bitmap = RoaringBitmap::from_iter(affected_batches.keys());
|
||||
progress.update_progress(TaskDeletionProgress::RetrievingBatchTasks);
|
||||
let (atomic_progress, task_progress) =
|
||||
AtomicBatchStep::new(affected_batches_bitmap.len() as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for range in consecutive_ranges(affected_batches_bitmap.iter()) {
|
||||
let iter = self.queue.batch_to_tasks_mapping.range(&rtxn, &range)?;
|
||||
for batch in iter {
|
||||
let (batch_id, mut tasks) = batch?;
|
||||
let to_delete_tasks = affected_batches.remove(&batch_id).unwrap_or_default();
|
||||
tasks -= &to_delete_tasks;
|
||||
|
||||
// Note: we never delete tasks from the mapping. It's error-prone but intentional (perf)
|
||||
// We make sure to filter the tasks from the mapping when we read them.
|
||||
tasks &= &all_task_ids;
|
||||
|
||||
// We must remove the batch entirely
|
||||
if tasks.is_empty() {
|
||||
to_delete_batches.insert(batch_id);
|
||||
}
|
||||
|
||||
// We must remove the batch from all the reverse indexes it no longer has tasks for.
|
||||
|
||||
for (index, index_tasks) in affected_indexes_tasks.iter() {
|
||||
if index_tasks.is_disjoint(&tasks) {
|
||||
to_remove_from_indexes
|
||||
.entry(index)
|
||||
.or_insert_with(RoaringBitmap::new)
|
||||
.insert(batch_id);
|
||||
}
|
||||
}
|
||||
|
||||
for (status, status_tasks) in status_tasks.iter() {
|
||||
if status_tasks.is_disjoint(&tasks) {
|
||||
to_remove_from_statuses
|
||||
.entry(*status)
|
||||
.or_insert_with(RoaringBitmap::new)
|
||||
.insert(batch_id);
|
||||
}
|
||||
}
|
||||
|
||||
for (kind, kind_tasks) in affected_kinds_tasks.iter() {
|
||||
if kind_tasks.is_disjoint(&tasks) {
|
||||
to_remove_from_kinds
|
||||
.entry(*kind)
|
||||
.or_insert_with(RoaringBitmap::new)
|
||||
.insert(batch_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Note: no need to delete the persisted task data since
|
||||
// we can only delete succeeded, failed, and canceled tasks.
|
||||
// In each of those cases, the persisted data is supposed to
|
||||
// have been deleted already.
|
||||
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
let mut owned_wtxn = self.env.write_txn()?;
|
||||
let wtxn = &mut owned_wtxn;
|
||||
|
||||
// 7. Remove task datetimes
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksDateTime);
|
||||
remove_task_datetimes(wtxn, tasks_enqueued_to_remove, self.queue.tasks.enqueued_at)?;
|
||||
remove_task_datetimes(wtxn, tasks_started_to_remove, self.queue.tasks.started_at)?;
|
||||
remove_task_datetimes(wtxn, tasks_finished_to_remove, self.queue.tasks.finished_at)?;
|
||||
|
||||
// 8. Delete batches datetimes
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesDateTime);
|
||||
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.enqueued_at)?;
|
||||
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.started_at)?;
|
||||
remove_batch_datetimes(wtxn, &to_delete_batches, self.queue.batches.finished_at)?;
|
||||
|
||||
// 9. Remove batches metadata from indexes, statuses, and kinds
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatchesMetadata);
|
||||
|
||||
for (index, batches) in to_remove_from_indexes {
|
||||
self.queue.batches.update_index(wtxn, index, |b| {
|
||||
*b -= &batches;
|
||||
})?;
|
||||
}
|
||||
|
||||
for (status, batches) in to_remove_from_statuses {
|
||||
self.queue.batches.update_status(wtxn, status, |b| {
|
||||
*b -= &batches;
|
||||
})?;
|
||||
}
|
||||
|
||||
for (kind, batches) in to_remove_from_kinds {
|
||||
self.queue.batches.update_kind(wtxn, kind, |b| {
|
||||
*b -= &batches;
|
||||
})?;
|
||||
}
|
||||
|
||||
// 10. Remove tasks from indexes, statuses, and kinds
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasksMetadata);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(
|
||||
(affected_indexes.len() + affected_statuses.len() + affected_kinds.len()) as u32,
|
||||
);
|
||||
progress.update_progress(task_progress);
|
||||
for index in affected_indexes.iter() {
|
||||
self.queue.tasks.update_index(wtxn, index, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
|
||||
for (index, mut tasks) in affected_indexes_tasks.into_iter() {
|
||||
tasks -= &to_delete_tasks;
|
||||
if tasks.is_empty() {
|
||||
self.queue.tasks.index_tasks.delete(wtxn, index)?;
|
||||
} else {
|
||||
self.queue.tasks.index_tasks.put(wtxn, index, &tasks)?;
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for status in affected_statuses.iter() {
|
||||
self.queue.tasks.update_status(wtxn, *status, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
for status in affected_statuses.into_iter() {
|
||||
let mut tasks = status_tasks.remove(&status).unwrap(); // we inserted all statuses above
|
||||
tasks -= &to_delete_tasks;
|
||||
if tasks.is_empty() {
|
||||
self.queue.tasks.status.delete(wtxn, &status)?;
|
||||
} else {
|
||||
self.queue.tasks.status.put(wtxn, &status, &tasks)?;
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for kind in affected_kinds.iter() {
|
||||
self.queue.tasks.update_kind(wtxn, *kind, |bitmap| *bitmap -= &to_delete_tasks)?;
|
||||
for (kind, mut tasks) in affected_kinds_tasks.into_iter() {
|
||||
tasks -= &to_delete_tasks;
|
||||
if tasks.is_empty() {
|
||||
self.queue.tasks.kind.delete(wtxn, &kind)?;
|
||||
} else {
|
||||
self.queue.tasks.kind.put(wtxn, &kind, &tasks)?;
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 11. Delete tasks
|
||||
progress.update_progress(TaskDeletionProgress::DeletingTasks);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_tasks.len() as u32);
|
||||
let (atomic_progress, task_progress) =
|
||||
AtomicTaskStep::new((to_delete_tasks.len() + affected_canceled_by.len()) as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for task in to_delete_tasks.iter() {
|
||||
self.queue.tasks.all_tasks.delete(wtxn, &task)?;
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
for range in consecutive_ranges(to_delete_tasks.iter()) {
|
||||
self.queue.tasks.all_tasks.delete_range(wtxn, &range)?;
|
||||
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
for canceled_by in affected_canceled_by {
|
||||
if let Some(mut tasks) = self.queue.tasks.canceled_by.get(wtxn, &canceled_by)? {
|
||||
tasks -= &to_delete_tasks;
|
||||
@@ -672,96 +909,21 @@ impl IndexScheduler {
|
||||
self.queue.tasks.canceled_by.put(wtxn, &canceled_by, &tasks)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||
let (atomic_progress, batch_progress) = AtomicBatchStep::new(affected_batches.len() as u32);
|
||||
progress.update_progress(batch_progress);
|
||||
for (batch_id, to_delete_tasks) in affected_batches {
|
||||
if let Some(mut tasks) = self.queue.batch_to_tasks_mapping.get(wtxn, &batch_id)? {
|
||||
tasks -= &to_delete_tasks;
|
||||
// We must remove the batch entirely
|
||||
if tasks.is_empty() {
|
||||
if let Some(batch) = self.queue.batches.get_batch(wtxn, batch_id)? {
|
||||
if let Some(BatchEnqueuedAt { earliest, oldest }) = batch.enqueued_at {
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.enqueued_at,
|
||||
earliest,
|
||||
batch_id,
|
||||
)?;
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.enqueued_at,
|
||||
oldest,
|
||||
batch_id,
|
||||
)?;
|
||||
} else {
|
||||
// If we don't have the enqueued at in the batch it means the database comes from the v1.12
|
||||
// and we still need to find the date by scrolling the database
|
||||
remove_n_tasks_datetime_earlier_than(
|
||||
wtxn,
|
||||
self.queue.batches.enqueued_at,
|
||||
batch.started_at,
|
||||
batch.stats.total_nb_tasks.clamp(1, 2) as usize,
|
||||
batch_id,
|
||||
)?;
|
||||
}
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.started_at,
|
||||
batch.started_at,
|
||||
batch_id,
|
||||
)?;
|
||||
if let Some(finished_at) = batch.finished_at {
|
||||
remove_task_datetime(
|
||||
wtxn,
|
||||
self.queue.batches.finished_at,
|
||||
finished_at,
|
||||
batch_id,
|
||||
)?;
|
||||
}
|
||||
|
||||
self.queue.batches.all_batches.delete(wtxn, &batch_id)?;
|
||||
self.queue.batch_to_tasks_mapping.delete(wtxn, &batch_id)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Anyway, we must remove the batch from all its reverse indexes.
|
||||
// The only way to do that is to check
|
||||
|
||||
for index in affected_indexes.iter() {
|
||||
let index_tasks = self.queue.tasks.index_tasks(wtxn, index)?;
|
||||
let remaining_index_tasks = index_tasks & &tasks;
|
||||
if remaining_index_tasks.is_empty() {
|
||||
self.queue.batches.update_index(wtxn, index, |bitmap| {
|
||||
bitmap.remove(batch_id);
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for status in affected_statuses.iter() {
|
||||
let status_tasks = self.queue.tasks.get_status(wtxn, *status)?;
|
||||
let remaining_status_tasks = status_tasks & &tasks;
|
||||
if remaining_status_tasks.is_empty() {
|
||||
self.queue.batches.update_status(wtxn, *status, |bitmap| {
|
||||
bitmap.remove(batch_id);
|
||||
})?;
|
||||
}
|
||||
}
|
||||
|
||||
for kind in affected_kinds.iter() {
|
||||
let kind_tasks = self.queue.tasks.get_kind(wtxn, *kind)?;
|
||||
let remaining_kind_tasks = kind_tasks & &tasks;
|
||||
if remaining_kind_tasks.is_empty() {
|
||||
self.queue.batches.update_kind(wtxn, *kind, |bitmap| {
|
||||
bitmap.remove(batch_id);
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
atomic_progress.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 12. Delete batches
|
||||
progress.update_progress(TaskDeletionProgress::DeletingBatches);
|
||||
let (atomic_progress, task_progress) = AtomicTaskStep::new(to_delete_batches.len() as u32);
|
||||
progress.update_progress(task_progress);
|
||||
for range in consecutive_ranges(to_delete_batches.iter()) {
|
||||
self.queue.batches.all_batches.delete_range(wtxn, &range)?;
|
||||
self.queue.batch_to_tasks_mapping.delete_range(wtxn, &range)?;
|
||||
atomic_progress.fetch_add(range.size_hint().0 as u32, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
owned_wtxn.commit()?;
|
||||
|
||||
Ok(to_delete_tasks)
|
||||
}
|
||||
|
||||
|
||||
@@ -83,12 +83,6 @@ impl IndexScheduler {
|
||||
t.finished_at = Some(finished_at);
|
||||
}
|
||||
|
||||
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
|
||||
// This prevent from referencing *future* batches not actually associated with the task.
|
||||
//
|
||||
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
|
||||
t.batch_uid = None;
|
||||
|
||||
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
|
||||
|
||||
// 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
|
||||
|
||||
@@ -20,7 +20,6 @@ failed [3,]
|
||||
### Kind:
|
||||
"indexCreation" [1,2,3,4,]
|
||||
"taskDeletion" [5,]
|
||||
"upgradeDatabase" []
|
||||
----------------------------------------------------------------------
|
||||
### Index Tasks:
|
||||
catto [1,]
|
||||
|
||||
@@ -50,7 +50,6 @@ doggo [2,3,]
|
||||
----------------------------------------------------------------------
|
||||
### Batches Status:
|
||||
succeeded [0,]
|
||||
failed []
|
||||
----------------------------------------------------------------------
|
||||
### Batches Kind:
|
||||
"upgradeDatabase" [0,]
|
||||
|
||||
@@ -3,6 +3,7 @@ use meili_snap::snapshot;
|
||||
use meilisearch_types::milli::obkv_to_json;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod::*;
|
||||
use meilisearch_types::tasks::KindWithContent;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use crate::insta_snapshot::snapshot_index_scheduler;
|
||||
use crate::test_utils::read_json;
|
||||
@@ -1148,3 +1149,82 @@ fn test_document_addition_with_set_and_null_primary_key_inference_works() {
|
||||
.collect::<Vec<_>>();
|
||||
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_deletion_issue_5827() {
|
||||
// 1. We're going to autobatch 2 document addition
|
||||
// 2. We will delete the first task
|
||||
// 3. We will delete the second task
|
||||
// 4. The batch should be gone
|
||||
|
||||
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
|
||||
|
||||
let mut tasks = Vec::new();
|
||||
for i in 0..2 {
|
||||
let content = format!(
|
||||
r#"{{
|
||||
"id": {},
|
||||
"doggo": "bob {}"
|
||||
}}"#,
|
||||
i, i
|
||||
);
|
||||
|
||||
let (uuid, mut file) = index_scheduler.queue.create_update_file_with_uuid(i).unwrap();
|
||||
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
|
||||
file.persist().unwrap();
|
||||
let task = index_scheduler
|
||||
.register(
|
||||
KindWithContent::DocumentAdditionOrUpdate {
|
||||
index_uid: S("doggos"),
|
||||
primary_key: Some(S("id")),
|
||||
method: ReplaceDocuments,
|
||||
content_file: uuid,
|
||||
documents_count,
|
||||
allow_index_creation: true,
|
||||
},
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
tasks.push(task);
|
||||
index_scheduler.assert_internally_consistent();
|
||||
}
|
||||
|
||||
handle.advance_one_successful_batch();
|
||||
let rtxn = index_scheduler.read_txn().unwrap();
|
||||
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
|
||||
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[0]);
|
||||
|
||||
index_scheduler
|
||||
.register(
|
||||
KindWithContent::TaskDeletion {
|
||||
query: String::from("whatever"),
|
||||
tasks: RoaringBitmap::from_iter([tasks[0].uid]),
|
||||
},
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
handle.advance_one_successful_batch();
|
||||
let rtxn = index_scheduler.read_txn().unwrap();
|
||||
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
|
||||
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[0, 1]);
|
||||
|
||||
index_scheduler
|
||||
.register(
|
||||
KindWithContent::TaskDeletion {
|
||||
query: String::from("whatever"),
|
||||
tasks: RoaringBitmap::from_iter([tasks[1].uid]),
|
||||
},
|
||||
None,
|
||||
false,
|
||||
)
|
||||
.unwrap();
|
||||
handle.advance_one_successful_batch();
|
||||
let rtxn = index_scheduler.read_txn().unwrap();
|
||||
let batches = index_scheduler.queue.batches.all_batch_ids(&rtxn).unwrap();
|
||||
assert_eq!(batches.into_iter().collect::<Vec<_>>().as_slice(), &[1, 2]);
|
||||
|
||||
let batch0 = index_scheduler.queue.batches.get_batch(&rtxn, 0).unwrap();
|
||||
assert!(batch0.is_none(), "Batch 0 should have been deleted");
|
||||
}
|
||||
|
||||
@@ -7,6 +7,9 @@ use tracing::info;
|
||||
|
||||
use crate::queue::TaskQueue;
|
||||
use crate::versioning::Versioning;
|
||||
use v1_18::V1_17_To_V1_18_0;
|
||||
|
||||
mod v1_18;
|
||||
|
||||
trait UpgradeIndexScheduler {
|
||||
fn upgrade(
|
||||
@@ -29,7 +32,8 @@ pub fn upgrade_index_scheduler(
|
||||
let current_patch = to.2;
|
||||
|
||||
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[
|
||||
// This is the last upgrade function, it will be called when the index is up to date.
|
||||
&V1_17_To_V1_18_0 {},
|
||||
// This is the last upgrade function, it will be called when the scheduler is up to date.
|
||||
// any other upgrade function should be added before this one.
|
||||
&ToCurrentNoOp {},
|
||||
];
|
||||
@@ -40,6 +44,7 @@ pub fn upgrade_index_scheduler(
|
||||
(1, 14, _) => 0,
|
||||
(1, 15, _) => 0,
|
||||
(1, 16, _) => 0,
|
||||
(1, 17, _) => 0,
|
||||
(major, minor, patch) => {
|
||||
if major > current_major
|
||||
|| (major == current_major && minor > current_minor)
|
||||
|
||||
62
crates/index-scheduler/src/upgrade/v1_18.rs
Normal file
62
crates/index-scheduler/src/upgrade/v1_18.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
use meilisearch_types::{
|
||||
heed::Database,
|
||||
milli::{CboRoaringBitmapCodec, BEU32},
|
||||
};
|
||||
use tracing::info;
|
||||
|
||||
use super::UpgradeIndexScheduler;
|
||||
use crate::queue::{db_name::BATCH_TO_TASKS_MAPPING, BatchQueue};
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct V1_17_To_V1_18_0();
|
||||
|
||||
impl UpgradeIndexScheduler for V1_17_To_V1_18_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
env: &meilisearch_types::heed::Env<meilisearch_types::heed::WithoutTls>,
|
||||
wtxn: &mut meilisearch_types::heed::RwTxn,
|
||||
_original: (u32, u32, u32),
|
||||
) -> anyhow::Result<()> {
|
||||
let batch_queue = BatchQueue::new(env, wtxn)?;
|
||||
let all_batch_ids = batch_queue.all_batch_ids(wtxn)?;
|
||||
|
||||
let batch_to_tasks_mapping: Database<BEU32, CboRoaringBitmapCodec> =
|
||||
env.create_database(wtxn, Some(BATCH_TO_TASKS_MAPPING))?;
|
||||
|
||||
let all_batches = batch_queue.all_batches.lazily_decode_data();
|
||||
let iter = all_batches.iter(wtxn)?;
|
||||
let mut range_start = None;
|
||||
let mut count = 0;
|
||||
let mut ranges = Vec::new();
|
||||
for batch in iter {
|
||||
let (batch_id, _) = batch?;
|
||||
|
||||
if !all_batch_ids.contains(batch_id) {
|
||||
count += 1;
|
||||
if range_start.is_none() {
|
||||
range_start = Some(batch_id);
|
||||
}
|
||||
} else if let Some(start) = range_start.take() {
|
||||
ranges.push(start..batch_id);
|
||||
}
|
||||
}
|
||||
if let Some(start) = range_start {
|
||||
ranges.push(start..u32::MAX);
|
||||
}
|
||||
|
||||
if !ranges.is_empty() {
|
||||
info!("Removing {count} batches that were not properly removed in previous versions due to #5827.");
|
||||
}
|
||||
|
||||
for range in ranges {
|
||||
batch_queue.all_batches.delete_range(wtxn, &range)?;
|
||||
batch_to_tasks_mapping.delete_range(wtxn, &range)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 18, 0)
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
use crate::milli::progress::EmbedderStats;
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::ops::Bound;
|
||||
use std::ops::{Bound, RangeInclusive};
|
||||
use std::sync::Arc;
|
||||
|
||||
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
|
||||
@@ -159,6 +159,30 @@ impl ProcessingBatch {
|
||||
}
|
||||
}
|
||||
|
||||
/// Given a **sorted** iterator of `u32`, return an iterator of the ranges of consecutive values it contains.
|
||||
pub(crate) fn consecutive_ranges(
|
||||
iter: impl IntoIterator<Item = u32>,
|
||||
) -> impl Iterator<Item = RangeInclusive<u32>> {
|
||||
let mut iter = iter.into_iter().peekable();
|
||||
|
||||
std::iter::from_fn(move || {
|
||||
let start = iter.next()?;
|
||||
|
||||
let mut end = start;
|
||||
|
||||
while let Some(&next) = iter.peek() {
|
||||
if next == end + 1 {
|
||||
end = next;
|
||||
iter.next();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Some(start..=end)
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn insert_task_datetime(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<BEI128, CboRoaringBitmapCodec>,
|
||||
@@ -168,7 +192,7 @@ pub(crate) fn insert_task_datetime(
|
||||
let timestamp = time.unix_timestamp_nanos();
|
||||
let mut task_ids = database.get(wtxn, ×tamp)?.unwrap_or_default();
|
||||
task_ids.insert(task_id);
|
||||
database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(task_ids))?;
|
||||
database.put(wtxn, ×tamp, &task_ids)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -184,7 +208,7 @@ pub(crate) fn remove_task_datetime(
|
||||
if existing.is_empty() {
|
||||
database.delete(wtxn, ×tamp)?;
|
||||
} else {
|
||||
database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(existing))?;
|
||||
database.put(wtxn, ×tamp, &existing)?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
tag = "Export",
|
||||
security(("Bearer" = ["export", "*"])),
|
||||
responses(
|
||||
(status = 202, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = OK, description = "Export successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 1,
|
||||
"status": "enqueued",
|
||||
@@ -106,6 +106,7 @@ async fn export(
|
||||
|
||||
analytics.publish(analytics_aggregate, &req);
|
||||
|
||||
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
}
|
||||
|
||||
|
||||
@@ -310,7 +310,7 @@ impl Aggregate for DocumentsDeletionAggregator {
|
||||
("documentId" = String, Path, example = "853", description = "Document Identifier", nullable = false),
|
||||
),
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -427,7 +427,7 @@ pub struct BrowseQuery {
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
request_body = BrowseQuery,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = PaginationView<serde_json::Value>, content_type = "application/json", example = json!(
|
||||
(status = 200, description = "Documents returned", body = PaginationView<serde_json::Value>, content_type = "application/json", example = json!(
|
||||
{
|
||||
"results":[
|
||||
{
|
||||
@@ -745,7 +745,7 @@ impl<Method: AggregateMethod> Aggregate for DocumentsAggregator<Method> {
|
||||
),
|
||||
request_body = serde_json::Value,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -846,7 +846,7 @@ pub async fn replace_documents(
|
||||
),
|
||||
request_body = serde_json::Value,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -1112,7 +1112,7 @@ async fn copy_body_to_file(
|
||||
),
|
||||
request_body = Vec<Value>,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -1303,7 +1303,7 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
|
||||
),
|
||||
request_body = DocumentEditionByFunction,
|
||||
responses(
|
||||
(status = 202, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -1401,7 +1401,7 @@ pub async fn edit_documents_by_function(
|
||||
security(("Bearer" = ["documents.delete", "documents.*", "*"])),
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
|
||||
@@ -231,7 +231,7 @@ impl Aggregate for IndexCreatedAggregate {
|
||||
security(("Bearer" = ["indexes.create", "indexes.*", "*"])),
|
||||
request_body = IndexCreateRequest,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": "movies",
|
||||
|
||||
@@ -98,7 +98,7 @@ macro_rules! make_setting_route {
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
request_body = $type,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": "movies",
|
||||
@@ -162,7 +162,7 @@ macro_rules! make_setting_route {
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
request_body = $type,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": "movies",
|
||||
@@ -530,7 +530,7 @@ make_setting_routes!(
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
request_body = Settings<Unchecked>,
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": "movies",
|
||||
@@ -680,7 +680,7 @@ pub async fn get_all(
|
||||
security(("Bearer" = ["settings.update", "settings.*", "*"])),
|
||||
params(("indexUid", example = "movies", description = "Index Unique Identifier", nullable = false)),
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": "movies",
|
||||
|
||||
@@ -73,7 +73,7 @@ impl Aggregate for IndexSwappedAnalytics {
|
||||
security(("Bearer" = ["search", "*"])),
|
||||
request_body = Vec<SwapIndexesPayload>,
|
||||
responses(
|
||||
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = ACCEPTED, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 3,
|
||||
"indexUid": null,
|
||||
|
||||
@@ -311,7 +311,7 @@ impl<Method: AggregateMethod + 'static> Aggregate for TaskFilterAnalytics<Method
|
||||
security(("Bearer" = ["tasks.cancel", "tasks.*", "*"])),
|
||||
params(TaskDeletionOrCancelationQuery),
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -392,6 +392,7 @@ async fn cancel_tasks(
|
||||
.await??;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
}
|
||||
|
||||
@@ -405,7 +406,7 @@ async fn cancel_tasks(
|
||||
security(("Bearer" = ["tasks.delete", "tasks.*", "*"])),
|
||||
params(TaskDeletionOrCancelationQuery),
|
||||
responses(
|
||||
(status = 200, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
(status = OK, description = "Task successfully enqueued", body = SummarizedTaskView, content_type = "application/json", example = json!(
|
||||
{
|
||||
"taskUid": 147,
|
||||
"indexUid": null,
|
||||
@@ -485,6 +486,7 @@ async fn delete_tasks(
|
||||
.await??;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
// FIXME: This should be 202 Accepted, but changing would be breaking so we need to wait 2.0
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
}
|
||||
|
||||
|
||||
@@ -1274,7 +1274,7 @@ async fn test_summarized_batch_cancelation() {
|
||||
|
||||
#[actix_web::test]
|
||||
async fn test_summarized_batch_deletion() {
|
||||
let server = Server::new_shared();
|
||||
let server = Server::new().await;
|
||||
let index = server.unique_index();
|
||||
// to avoid being flaky we're only going to delete an already finished batch :(
|
||||
let (task, _status_code) = index.create(None).await;
|
||||
@@ -1293,7 +1293,7 @@ async fn test_summarized_batch_deletion() {
|
||||
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
|
||||
".details.originalFilter" => "?uids=X"
|
||||
},
|
||||
@r###"
|
||||
@r#"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"progress": null,
|
||||
@@ -1318,7 +1318,7 @@ async fn test_summarized_batch_deletion() {
|
||||
"finishedAt": "[date]",
|
||||
"batchStrategy": "stopped after the last task of type `taskDeletion` because they cannot be batched with tasks of any other type."
|
||||
}
|
||||
"###);
|
||||
"#);
|
||||
}
|
||||
|
||||
#[actix_web::test]
|
||||
|
||||
@@ -154,6 +154,24 @@ impl From<Vec<Value>> for Value {
|
||||
}
|
||||
}
|
||||
|
||||
pub trait IntoTaskUid {
|
||||
fn uid(&self) -> u64;
|
||||
}
|
||||
|
||||
impl IntoTaskUid for Value {
|
||||
fn uid(&self) -> u64 {
|
||||
self["taskUid"].as_u64().unwrap_or_else(|| {
|
||||
panic!("Called `uid` on a Value that doesn't contain a taskUid: {self}")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoTaskUid for u64 {
|
||||
fn uid(&self) -> u64 {
|
||||
*self
|
||||
}
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! json {
|
||||
($($json:tt)+) => {
|
||||
|
||||
@@ -426,7 +426,9 @@ impl<State> Server<State> {
|
||||
self.service.delete(format!("/tasks?{}", value)).await
|
||||
}
|
||||
|
||||
pub async fn wait_task(&self, update_id: u64) -> Value {
|
||||
pub async fn wait_task(&self, update_id: impl super::IntoTaskUid) -> Value {
|
||||
let update_id = update_id.uid();
|
||||
|
||||
// try several times to get status, or panic to not wait forever
|
||||
let url = format!("/tasks/{update_id}");
|
||||
let max_attempts = 400; // 200 seconds in total, 0.5secs per attempt
|
||||
|
||||
247
crates/meilisearch/tests/tasks/deletion.rs
Normal file
247
crates/meilisearch/tests/tasks/deletion.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
use crate::common::Server;
|
||||
use crate::json;
|
||||
use crate::tasks::OffsetDateTime;
|
||||
use meili_snap::{json_string, snapshot};
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use urlencoding::encode;
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn delete_task() {
|
||||
let server = Server::new().await;
|
||||
let index = server.unique_index();
|
||||
|
||||
// Add a document
|
||||
let (task, code) = index
|
||||
.add_documents(json!([{"id": 1, "free": "palestine", "asc_desc_rank": 1}]), Some("id"))
|
||||
.await;
|
||||
snapshot!(code, @r#"202 Accepted"#);
|
||||
let task_uid = task["taskUid"].as_u64().unwrap();
|
||||
server.wait_task(task).await.succeeded();
|
||||
|
||||
// Delete tasks
|
||||
let (task, code) = server.delete_tasks(&format!("uids={task_uid}")).await;
|
||||
snapshot!(code, @"200 OK");
|
||||
let value = server.wait_task(task).await.succeeded();
|
||||
snapshot!(value, @r#"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
"batchUid": "[batch_uid]",
|
||||
"indexUid": null,
|
||||
"status": "succeeded",
|
||||
"type": "taskDeletion",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"matchedTasks": 1,
|
||||
"deletedTasks": 1,
|
||||
"originalFilter": "?uids=0"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"#);
|
||||
|
||||
// Check that the task is deleted
|
||||
let (value, code) = index.list_tasks().await;
|
||||
snapshot!(code, @r#"200 OK"#);
|
||||
snapshot!(value, @r#"
|
||||
{
|
||||
"results": [],
|
||||
"total": 0,
|
||||
"limit": 20,
|
||||
"from": null,
|
||||
"next": null
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
async fn delete_tasks_time_bounds_inner(name: &str) {
|
||||
let server = Server::new().await;
|
||||
let index = server.unique_index();
|
||||
|
||||
// Add documents
|
||||
for i in 0..2 {
|
||||
let (task, code) =
|
||||
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
|
||||
snapshot!(code, @r#"202 Accepted"#);
|
||||
server.wait_task(task).await.succeeded();
|
||||
}
|
||||
|
||||
let time1 = OffsetDateTime::now_utc();
|
||||
|
||||
for i in 2..4 {
|
||||
let (task, code) =
|
||||
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
|
||||
snapshot!(code, @r#"202 Accepted"#);
|
||||
server.wait_task(task).await.succeeded();
|
||||
}
|
||||
|
||||
let time2 = OffsetDateTime::now_utc();
|
||||
|
||||
for i in 4..6 {
|
||||
let (task, code) =
|
||||
index.add_documents(json!([{"id": i, "country": "taiwan"}]), Some("id")).await;
|
||||
snapshot!(code, @r#"202 Accepted"#);
|
||||
server.wait_task(task).await.succeeded();
|
||||
}
|
||||
|
||||
// Delete tasks with before_enqueued and after_enqueued
|
||||
let (task, code) = server
|
||||
.delete_tasks(&format!(
|
||||
"before{name}={}&after{name}={}",
|
||||
encode(&time2.format(&Rfc3339).unwrap()),
|
||||
encode(&time1.format(&Rfc3339).unwrap()),
|
||||
))
|
||||
.await;
|
||||
snapshot!(code, @"200 OK");
|
||||
let value = server.wait_task(task).await.succeeded();
|
||||
snapshot!(json_string!(value, {
|
||||
".details.originalFilter" => "[ignored]",
|
||||
".duration" => "[duration]",
|
||||
".enqueuedAt" => "[date]",
|
||||
".startedAt" => "[date]",
|
||||
".finishedAt" => "[date]"
|
||||
}), @r#"
|
||||
{
|
||||
"uid": 6,
|
||||
"batchUid": 6,
|
||||
"indexUid": null,
|
||||
"status": "succeeded",
|
||||
"type": "taskDeletion",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"matchedTasks": 2,
|
||||
"deletedTasks": 2,
|
||||
"originalFilter": "[ignored]"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
"#);
|
||||
|
||||
// Check that the task is deleted
|
||||
let (value, code) = server.tasks().await;
|
||||
snapshot!(code, @r#"200 OK"#);
|
||||
snapshot!(json_string!(value, {
|
||||
".results[].duration" => "[duration]",
|
||||
".results[].enqueuedAt" => "[date]",
|
||||
".results[].startedAt" => "[date]",
|
||||
".results[].finishedAt" => "[date]",
|
||||
".results[].details.originalFilter" => "[ignored]"
|
||||
}), @r#"
|
||||
{
|
||||
"results": [
|
||||
{
|
||||
"uid": 6,
|
||||
"batchUid": 6,
|
||||
"indexUid": null,
|
||||
"status": "succeeded",
|
||||
"type": "taskDeletion",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"matchedTasks": 2,
|
||||
"deletedTasks": 2,
|
||||
"originalFilter": "[ignored]"
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
},
|
||||
{
|
||||
"uid": 5,
|
||||
"batchUid": 5,
|
||||
"indexUid": "[uuid]",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
},
|
||||
{
|
||||
"uid": 4,
|
||||
"batchUid": 4,
|
||||
"indexUid": "[uuid]",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
},
|
||||
{
|
||||
"uid": 1,
|
||||
"batchUid": 1,
|
||||
"indexUid": "[uuid]",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
},
|
||||
{
|
||||
"uid": 0,
|
||||
"batchUid": 0,
|
||||
"indexUid": "[uuid]",
|
||||
"status": "succeeded",
|
||||
"type": "documentAdditionOrUpdate",
|
||||
"canceledBy": null,
|
||||
"details": {
|
||||
"receivedDocuments": 1,
|
||||
"indexedDocuments": 1
|
||||
},
|
||||
"error": null,
|
||||
"duration": "[duration]",
|
||||
"enqueuedAt": "[date]",
|
||||
"startedAt": "[date]",
|
||||
"finishedAt": "[date]"
|
||||
}
|
||||
],
|
||||
"total": 5,
|
||||
"limit": 20,
|
||||
"from": 6,
|
||||
"next": null
|
||||
}
|
||||
"#);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn delete_tasks_enqueued() {
|
||||
delete_tasks_time_bounds_inner("EnqueuedAt").await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn delete_tasks_started() {
|
||||
delete_tasks_time_bounds_inner("StartedAt").await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn delete_tasks_finished() {
|
||||
delete_tasks_time_bounds_inner("FinishedAt").await;
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
mod deletion;
|
||||
mod errors;
|
||||
mod webhook;
|
||||
|
||||
|
||||
@@ -3,16 +3,17 @@ mod v1_13;
|
||||
mod v1_14;
|
||||
mod v1_15;
|
||||
mod v1_16;
|
||||
mod v1_17;
|
||||
use heed::RwTxn;
|
||||
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
|
||||
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13};
|
||||
use v1_14::Latest_V1_13_To_Latest_V1_14;
|
||||
use v1_15::Latest_V1_14_To_Latest_V1_15;
|
||||
use v1_16::Latest_V1_16_To_V1_17_0;
|
||||
use v1_16::Latest_V1_15_To_V1_16_0;
|
||||
use v1_17::Latest_V1_16_To_V1_17_0;
|
||||
|
||||
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use crate::progress::{Progress, VariableNameStep};
|
||||
use crate::update::upgrade::v1_16::Latest_V1_15_To_V1_16_0;
|
||||
use crate::{Index, InternalError, Result};
|
||||
|
||||
trait UpgradeIndex {
|
||||
|
||||
@@ -46,22 +46,3 @@ impl UpgradeIndex for Latest_V1_15_To_V1_16_0 {
|
||||
(1, 16, 0)
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_16_To_V1_17_0();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_wtxn: &mut RwTxn,
|
||||
_index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
_progress: Progress,
|
||||
) -> Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 17, 0)
|
||||
}
|
||||
}
|
||||
|
||||
24
crates/milli/src/update/upgrade/v1_17.rs
Normal file
24
crates/milli/src/update/upgrade/v1_17.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use heed::RwTxn;
|
||||
|
||||
use super::UpgradeIndex;
|
||||
use crate::progress::Progress;
|
||||
use crate::{Index, Result};
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_16_To_V1_17_0();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_16_To_V1_17_0 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_wtxn: &mut RwTxn,
|
||||
_index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
_progress: Progress,
|
||||
) -> Result<bool> {
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 17, 0)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user