4435: Make update file deletion atomic r=Kerollmops a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/4432
Fixes https://github.com/meilisearch/meilisearch/issues/4438 by adding the logs the user asked

## What does this PR do?
- Adds a bunch of logs to help debug this kind of issue in the future
- Delete the update files AFTER committing the update in the `index-scheduler` (thus, if a restart happens, we are able to re-process the batch successfully)
- Multi-thread the deletion of all update files.


Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot]
2024-02-26 17:54:40 +00:00
committed by GitHub
6 changed files with 80 additions and 29 deletions

10
Cargo.lock generated
View File

@@ -1728,6 +1728,7 @@ dependencies = [
"faux", "faux",
"tempfile", "tempfile",
"thiserror", "thiserror",
"tracing",
"uuid", "uuid",
] ]
@@ -2393,6 +2394,7 @@ dependencies = [
"meilisearch-types", "meilisearch-types",
"page_size 0.5.0", "page_size 0.5.0",
"puffin", "puffin",
"rayon",
"roaring", "roaring",
"serde", "serde",
"serde_json", "serde_json",
@@ -4077,9 +4079,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon" name = "rayon"
version = "1.8.0" version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1" checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051"
dependencies = [ dependencies = [
"either", "either",
"rayon-core", "rayon-core",
@@ -4098,9 +4100,9 @@ dependencies = [
[[package]] [[package]]
name = "rayon-core" name = "rayon-core"
version = "1.12.0" version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed" checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [ dependencies = [
"crossbeam-deque", "crossbeam-deque",
"crossbeam-utils", "crossbeam-utils",

View File

@@ -13,6 +13,7 @@ license.workspace = true
[dependencies] [dependencies]
tempfile = "3.9.0" tempfile = "3.9.0"
thiserror = "1.0.56" thiserror = "1.0.56"
tracing = "0.1.40"
uuid = { version = "1.6.1", features = ["serde", "v4"] } uuid = { version = "1.6.1", features = ["serde", "v4"] }
[dev-dependencies] [dev-dependencies]

View File

@@ -61,7 +61,13 @@ impl FileStore {
/// Returns the file corresponding to the requested uuid. /// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> { pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
let path = self.get_update_path(uuid); let path = self.get_update_path(uuid);
let file = StdFile::open(path)?; let file = match StdFile::open(path) {
Ok(file) => file,
Err(e) => {
tracing::error!("Can't access update file {uuid}: {e}");
return Err(e.into());
}
};
Ok(file) Ok(file)
} }
@@ -96,9 +102,13 @@ impl FileStore {
pub fn delete(&self, uuid: Uuid) -> Result<()> { pub fn delete(&self, uuid: Uuid) -> Result<()> {
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
std::fs::remove_file(path)?; if let Err(e) = std::fs::remove_file(path) {
tracing::error!("Can't delete file {uuid}: {e}");
Err(e.into())
} else {
Ok(()) Ok(())
} }
}
/// List the Uuids of the files in the FileStore /// List the Uuids of the files in the FileStore
pub fn all_uuids(&self) -> Result<impl Iterator<Item = Result<Uuid>>> { pub fn all_uuids(&self) -> Result<impl Iterator<Item = Result<Uuid>>> {

View File

@@ -23,6 +23,7 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0" page_size = "0.5.0"
puffin = { version = "0.16.0", features = ["serialization"] } puffin = { version = "0.16.0", features = ["serialization"] }
rayon = "1.8.1"
roaring = { version = "0.10.2", features = ["serde"] } roaring = { version = "0.10.2", features = ["serde"] }
serde = { version = "1.0.195", features = ["derive"] } serde = { version = "1.0.195", features = ["derive"] }
serde_json = { version = "1.0.111", features = ["preserve_order"] } serde_json = { version = "1.0.111", features = ["preserve_order"] }

View File

@@ -142,22 +142,28 @@ pub(crate) enum IndexOperation {
impl Batch { impl Batch {
/// Return the task ids associated with this batch. /// Return the task ids associated with this batch.
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> RoaringBitmap {
match self { match self {
Batch::TaskCancelation { task, .. } Batch::TaskCancelation { task, .. }
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid], | Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks) Batch::SnapshotCreation(tasks)
| Batch::TaskDeletions(tasks) | Batch::TaskDeletions(tasks)
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), | Batch::IndexDeletion { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
Batch::IndexOperation { op, .. } => match op { Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::Settings { tasks, .. } | IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => { | IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect() RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
} }
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid],
IndexOperation::SettingsAndDocumentOperation { IndexOperation::SettingsAndDocumentOperation {
document_import_tasks: tasks, document_import_tasks: tasks,
settings_tasks: other, settings_tasks: other,
@@ -167,9 +173,11 @@ impl Batch {
cleared_tasks: tasks, cleared_tasks: tasks,
settings_tasks: other, settings_tasks: other,
.. ..
} => tasks.iter().chain(other).map(|task| task.uid).collect(), } => RoaringBitmap::from_iter(tasks.iter().chain(other).map(|task| task.uid)),
}, },
Batch::IndexSwap { task } => vec![task.uid], Batch::IndexSwap { task } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
} }
} }

View File

@@ -37,8 +37,8 @@ use std::fs::File;
use std::io::{self, BufReader, Read}; use std::io::{self, BufReader, Read};
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{self, Relaxed};
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@@ -60,6 +60,8 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
use meilisearch_types::task_view::TaskView; use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use puffin::FrameView; use puffin::FrameView;
use rayon::current_num_threads;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
@@ -1170,15 +1172,13 @@ impl IndexScheduler {
drop(rtxn); drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks. // 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids(); let ids = batch.ids();
ids.sort_unstable();
let processed_tasks = ids.len(); let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
// We reset the must_stop flag to be sure that we don't stop processing tasks // We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset(); self.must_stop_processing.reset();
self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); self.processing_tasks.write().unwrap().start_processing_at(started_at, ids.clone());
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::BatchCreated); self.breakpoint(Breakpoint::BatchCreated);
@@ -1207,6 +1207,9 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchSucceeded); self.breakpoint(Breakpoint::ProcessBatchSucceeded);
let mut success = 0;
let mut failure = 0;
#[allow(unused_variables)] #[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() { for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(started_at); task.started_at = Some(started_at);
@@ -1219,13 +1222,15 @@ impl IndexScheduler {
}, },
)?; )?;
match task.error {
Some(_) => failure += 1,
None => success += 1,
}
self.update_task(&mut wtxn, &task) self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
if let Err(e) = self.delete_persisted_task_data(&task) {
tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
} }
} tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
tracing::info!("A batch of tasks was successfully completed.");
} }
// If we have an abortion error we must stop the tick here and re-schedule tasks. // If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError( Err(Error::Milli(milli::Error::InternalError(
@@ -1236,6 +1241,7 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::AbortedIndexation); self.breakpoint(Breakpoint::AbortedIndexation);
wtxn.abort(); wtxn.abort();
tracing::info!("A batch of tasks was aborted.");
// We make sure that we don't call `stop_processing` on the `processing_tasks`, // We make sure that we don't call `stop_processing` on the `processing_tasks`,
// this is because we want to let the next tick call `create_next_batch` and keep // this is because we want to let the next tick call `create_next_batch` and keep
// the `started_at` date times and `processings` of the current processing tasks. // the `started_at` date times and `processings` of the current processing tasks.
@@ -1257,6 +1263,8 @@ impl IndexScheduler {
self.index_mapper.resize_index(&wtxn, &index_uid)?; self.index_mapper.resize_index(&wtxn, &index_uid)?;
wtxn.abort(); wtxn.abort();
tracing::info!("The max database size was reached. Resizing the index.");
return Ok(TickOutcome::TickAgain(0)); return Ok(TickOutcome::TickAgain(0));
} }
// In case of a failure we must get back and patch all the tasks with the error. // In case of a failure we must get back and patch all the tasks with the error.
@@ -1264,7 +1272,7 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchFailed); self.breakpoint(Breakpoint::ProcessBatchFailed);
let error: ResponseError = err.into(); let error: ResponseError = err.into();
for id in ids { for id in ids.iter() {
let mut task = self let mut task = self
.get_task(&wtxn, id) .get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
@@ -1278,9 +1286,8 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?; self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?;
if let Err(e) = self.delete_persisted_task_data(&task) { tracing::info!("Batch failed {}", error);
tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
}
self.update_task(&mut wtxn, &task) self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
} }
@@ -1294,6 +1301,28 @@ impl IndexScheduler {
wtxn.commit().map_err(Error::HeedTransaction)?; wtxn.commit().map_err(Error::HeedTransaction)?;
// Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart
tracing::debug!("Deleting the upadate files");
//We take one read transaction **per thread**. Then, every thread is going to pull out new IDs from the roaring bitmap with the help of an atomic shared index into the bitmap
let idx = AtomicU32::new(0);
(0..current_num_threads()).into_par_iter().try_for_each(|_| -> Result<()> {
let rtxn = self.read_txn()?;
while let Some(id) = ids.select(idx.fetch_add(1, Ordering::Relaxed)) {
let task = self
.get_task(&rtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?;
if let Err(e) = self.delete_persisted_task_data(&task) {
tracing::error!(
"Failure to delete the content files associated with task {}. Error: {e}",
task.uid
);
}
}
Ok(())
})?;
// We shouldn't crash the tick function if we can't send data to the webhook. // We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed); let _ = self.notify_webhook(&processed);
@@ -1706,7 +1735,7 @@ pub enum TickOutcome {
/// The scheduler should immediately attempt another `tick`. /// The scheduler should immediately attempt another `tick`.
/// ///
/// The `usize` field contains the number of processed tasks. /// The `usize` field contains the number of processed tasks.
TickAgain(usize), TickAgain(u64),
/// The scheduler should wait for an external signal before attempting another `tick`. /// The scheduler should wait for an external signal before attempting another `tick`.
WaitForSignal, WaitForSignal,
} }