move the version check to the task queue

This commit is contained in:
Tamo
2025-01-16 11:00:29 +01:00
committed by Louis Dureuil
parent e70ac35e02
commit 3ef7a478cd
9 changed files with 95 additions and 72 deletions

View File

@@ -147,7 +147,7 @@ pub enum Error {
#[error("Corrupted task queue.")] #[error("Corrupted task queue.")]
CorruptedTaskQueue, CorruptedTaskQueue,
#[error(transparent)] #[error(transparent)]
TaskDatabaseUpdate(Box<Self>), TaskDatabaseUpgrade(Box<Self>),
#[error(transparent)] #[error(transparent)]
HeedTransaction(heed::Error), HeedTransaction(heed::Error),
@@ -202,7 +202,7 @@ impl Error {
| Error::Anyhow(_) => true, | Error::Anyhow(_) => true,
Error::CreateBatch(_) Error::CreateBatch(_)
| Error::CorruptedTaskQueue | Error::CorruptedTaskQueue
| Error::TaskDatabaseUpdate(_) | Error::TaskDatabaseUpgrade(_)
| Error::HeedTransaction(_) => false, | Error::HeedTransaction(_) => false,
#[cfg(test)] #[cfg(test)]
Error::PlannedFailure => false, Error::PlannedFailure => false,
@@ -266,7 +266,7 @@ impl ErrorCode for Error {
Error::Anyhow(_) => Code::Internal, Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal, Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal, Error::CorruptedDump => Code::Internal,
Error::TaskDatabaseUpdate(_) => Code::Internal, Error::TaskDatabaseUpgrade(_) => Code::Internal,
Error::CreateBatch(_) => Code::Internal, Error::CreateBatch(_) => Code::Internal,
// This one should never be seen by the end user // This one should never be seen by the end user

View File

@@ -369,6 +369,7 @@ impl IndexScheduler {
match ret { match ret {
Ok(Ok(TickOutcome::TickAgain(_))) => (), Ok(Ok(TickOutcome::TickAgain(_))) => (),
Ok(Ok(TickOutcome::WaitForSignal)) => run.scheduler.wake_up.wait(), Ok(Ok(TickOutcome::WaitForSignal)) => run.scheduler.wake_up.wait(),
Ok(Ok(TickOutcome::StopProcessingForever)) => break,
Ok(Err(e)) => { Ok(Err(e)) => {
tracing::error!("{e}"); tracing::error!("{e}");
// Wait one second when an irrecoverable error occurs. // Wait one second when an irrecoverable error occurs.
@@ -816,6 +817,8 @@ pub enum TickOutcome {
TickAgain(u64), TickAgain(u64),
/// The scheduler should wait for an external signal before attempting another `tick`. /// The scheduler should wait for an external signal before attempting another `tick`.
WaitForSignal, WaitForSignal,
/// The scheduler exits the run-loop and will never process tasks again
StopProcessingForever,
} }
/// How many indexes we can afford to have open simultaneously. /// How many indexes we can afford to have open simultaneously.

View File

@@ -184,6 +184,7 @@ impl IndexScheduler {
progress.update_progress(BatchProgress::WritingTasksToDisk); progress.update_progress(BatchProgress::WritingTasksToDisk);
processing_batch.finished(); processing_batch.finished();
let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new(); let mut canceled = RoaringBitmap::new();
@@ -222,7 +223,7 @@ impl IndexScheduler {
self.queue self.queue
.tasks .tasks
.update_task(&mut wtxn, &task) .update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?;
} }
if let Some(canceled_by) = canceled_by { if let Some(canceled_by) = canceled_by {
self.queue.tasks.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?; self.queue.tasks.canceled_by.put(&mut wtxn, &canceled_by, &canceled)?;
@@ -273,6 +274,12 @@ impl IndexScheduler {
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32); let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
progress.update_progress(task_progress_obj); progress.update_progress(task_progress_obj);
if matches!(err, Error::TaskDatabaseUpgrade(_)) {
tracing::error!(
"Upgrade task failed, tasks won't be processed until the following issue is fixed: {err}"
);
stop_scheduler_forever = true;
}
let error: ResponseError = err.into(); let error: ResponseError = err.into();
for id in ids.iter() { for id in ids.iter() {
task_progress.fetch_add(1, Ordering::Relaxed); task_progress.fetch_add(1, Ordering::Relaxed);
@@ -280,7 +287,7 @@ impl IndexScheduler {
.queue .queue
.tasks .tasks
.get_task(&wtxn, id) .get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?; .ok_or(Error::CorruptedTaskQueue)?;
task.status = Status::Failed; task.status = Status::Failed;
task.error = Some(error.clone()); task.error = Some(error.clone());
@@ -297,7 +304,7 @@ impl IndexScheduler {
self.queue self.queue
.tasks .tasks
.update_task(&mut wtxn, &task) .update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?; .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?;
} }
} }
} }
@@ -327,7 +334,7 @@ impl IndexScheduler {
.queue .queue
.tasks .tasks
.get_task(&rtxn, id) .get_task(&rtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .map_err(|e| Error::TaskDatabaseUpgrade(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?; .ok_or(Error::CorruptedTaskQueue)?;
if let Err(e) = self.queue.delete_persisted_task_data(&task) { if let Err(e) = self.queue.delete_persisted_task_data(&task) {
tracing::error!( tracing::error!(
@@ -345,6 +352,10 @@ impl IndexScheduler {
#[cfg(test)] #[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing); self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing);
Ok(TickOutcome::TickAgain(processed_tasks)) if stop_scheduler_forever {
Ok(TickOutcome::StopProcessingForever)
} else {
Ok(TickOutcome::TickAgain(processed_tasks))
}
} }
} }

View File

@@ -1,4 +1,5 @@
use std::collections::{BTreeSet, HashMap, HashSet}; use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use meilisearch_types::batches::BatchId; use meilisearch_types::batches::BatchId;
@@ -314,7 +315,24 @@ impl IndexScheduler {
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok(vec![task]) Ok(vec![task])
} }
Batch::UpgradeDatabase { tasks } => self.process_upgrade(progress, tasks), Batch::UpgradeDatabase { mut tasks } => {
let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(progress)));
match ret {
Ok(Ok(())) => (),
Ok(Err(e)) => return Err(Error::TaskDatabaseUpgrade(Box::new(e))),
Err(_e) => {
return Err(Error::TaskDatabaseUpgrade(Box::new(
Error::ProcessBatchPanicked,
)));
}
}
for task in tasks.iter_mut() {
task.status = Status::Succeeded;
}
Ok(tasks)
}
} }
} }

View File

@@ -1,24 +1,14 @@
use meilisearch_types::{ use meilisearch_types::{
milli, milli,
milli::progress::{Progress, VariableNameStep}, milli::progress::{Progress, VariableNameStep},
tasks::{KindWithContent, Status, Task},
}; };
use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result}; use crate::{processing::UpgradeDatabaseProgress, Error, IndexScheduler, Result};
impl IndexScheduler { impl IndexScheduler {
pub(super) fn process_upgrade( pub(super) fn process_upgrade(&self, progress: Progress) -> Result<()> {
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap); progress.update_progress(UpgradeDatabaseProgress::EnsuringCorrectnessOfTheSwap);
// Since we should not have multiple upgrade tasks, we're only going to process the latest one:
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
unreachable!()
};
enum UpgradeIndex {} enum UpgradeIndex {}
let indexes = self.index_names()?; let indexes = self.index_names()?;
@@ -29,14 +19,10 @@ impl IndexScheduler {
indexes.len() as u32, indexes.len() as u32,
)); ));
let index = self.index(uid)?; let index = self.index(uid)?;
milli::update::upgrade::upgrade(&index, from, progress.clone()) milli::update::upgrade::upgrade(&index, progress.clone())
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
} }
for task in tasks.iter_mut() { Ok(())
task.status = Status::Succeeded;
}
Ok(tasks)
} }
} }

View File

@@ -1,16 +1,53 @@
use std::path::Path; use std::path::Path;
use anyhow::bail;
use meilisearch_types::{ use meilisearch_types::{
heed, heed,
tasks::{KindWithContent, Status, Task}, tasks::{KindWithContent, Status, Task},
versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH},
}; };
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::info; use tracing::info;
use crate::queue::TaskQueue; use crate::queue::TaskQueue;
pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow::Result<()> { pub fn upgrade_task_queue(tasks_path: &Path, from: (u32, u32, u32)) -> anyhow::Result<()> {
let current_major: u32 = VERSION_MAJOR.parse().unwrap();
let current_minor: u32 = VERSION_MINOR.parse().unwrap();
let current_patch: u32 = VERSION_PATCH.parse().unwrap();
let upgrade_functions =
[(v1_12_to_current as fn(&Path) -> anyhow::Result<()>, "Upgrading from v1.12 to v1.13")];
let start = match from {
(1, 12, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)
|| (major == current_major && minor == current_minor && patch > current_patch)
{
bail!(
"Database version {major}.{minor}.{patch} is higher than the binary version {current_major}.{current_minor}.{current_patch}. Downgrade is not supported",
);
} else if major < current_major
|| (major == current_major && minor < current_minor)
|| (major == current_major && minor == current_minor && patch < current_patch)
{
bail!(
"Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and imports it in the v{current_major}.{current_minor}.{current_patch}",
);
} else {
bail!("Unknown database version: v{major}.{minor}.{patch}");
}
}
};
info!("Upgrading the task queue"); info!("Upgrading the task queue");
for (upgrade, upgrade_name) in upgrade_functions[start..].iter() {
info!("{upgrade_name}");
(upgrade)(tasks_path)?;
}
let env = unsafe { let env = unsafe {
heed::EnvOpenOptions::new() heed::EnvOpenOptions::new()
.max_dbs(19) .max_dbs(19)
@@ -33,7 +70,7 @@ pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow
canceled_by: None, canceled_by: None,
details: None, details: None,
status: Status::Enqueued, status: Status::Enqueued,
kind: KindWithContent::UpgradeDatabase { from: version }, kind: KindWithContent::UpgradeDatabase { from },
}, },
)?; )?;
wtxn.commit()?; wtxn.commit()?;
@@ -41,3 +78,8 @@ pub fn upgrade_task_queue(tasks_path: &Path, version: (u32, u32, u32)) -> anyhow
env.prepare_for_closing().wait(); env.prepare_for_closing().wait();
Ok(()) Ok(())
} }
/// The task queue is 100% compatible with the previous versions
fn v1_12_to_current(_path: &Path) -> anyhow::Result<()> {
Ok(())
}

View File

@@ -456,9 +456,6 @@ impl ErrorCode for milli::Error {
| UserError::DocumentEditionCompilationError(_) => { | UserError::DocumentEditionCompilationError(_) => {
Code::EditDocumentsByFunctionError Code::EditDocumentsByFunctionError
} }
UserError::TooOldForUpgrade(_, _, _)
| UserError::CannotDowngrade(_, _, _)
| UserError::CannotUpgradeToUnknownVersion(_, _, _) => Code::CouldNotUpgrade,
} }
} }
} }

View File

@@ -10,7 +10,7 @@ use rhai::EvalAltResult;
use serde_json::Value; use serde_json::Value;
use thiserror::Error; use thiserror::Error;
use crate::constants::{RESERVED_GEO_FIELD_NAME, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::documents::{self, DocumentsBatchCursorError}; use crate::documents::{self, DocumentsBatchCursorError};
use crate::thread_pool_no_abort::PanicCatched; use crate::thread_pool_no_abort::PanicCatched;
use crate::{CriterionError, DocumentId, FieldId, Object, SortError}; use crate::{CriterionError, DocumentId, FieldId, Object, SortError};
@@ -74,6 +74,8 @@ pub enum InternalError {
AbortedIndexation, AbortedIndexation,
#[error("The matching words list contains at least one invalid member")] #[error("The matching words list contains at least one invalid member")]
InvalidMatchingWords, InvalidMatchingWords,
#[error("Cannot upgrade to the following version: v{0}.{1}.{2}.")]
CannotUpgradeToVersion(u32, u32, u32),
#[error(transparent)] #[error(transparent)]
ArroyError(#[from] arroy::Error), ArroyError(#[from] arroy::Error),
#[error(transparent)] #[error(transparent)]
@@ -288,12 +290,6 @@ and can not be more than 511 bytes.", .document_id.to_string()
DocumentEditionCompilationError(rhai::ParseError), DocumentEditionCompilationError(rhai::ParseError),
#[error("{0}")] #[error("{0}")]
DocumentEmbeddingError(String), DocumentEmbeddingError(String),
#[error("Upgrade could not be processed because v{0}.{1}.{2} of the database is too old. Please re-open the v{0}.{1}.{2} and use a dump to upgrade your version. The oldest version meilisearch can upgrade from is v1.12.0.")]
TooOldForUpgrade(u32, u32, u32),
#[error("Upgrade could not be processed because the database version (v{0}.{1}.{2}) is newer than the targeted version (v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH})")]
CannotDowngrade(u32, u32, u32),
#[error("Cannot upgrade to unknown version v{0}.{1}.{2}.")]
CannotUpgradeToUnknownVersion(u32, u32, u32),
} }
impl From<crate::vector::Error> for Error { impl From<crate::vector::Error> for Error {

View File

@@ -1,47 +1,17 @@
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use crate::progress::{Progress, VariableNameStep}; use crate::progress::{Progress, VariableNameStep};
use crate::{Index, Result, UserError}; use crate::{Index, InternalError, Result};
pub fn upgrade(index: &Index, base_version: (u32, u32, u32), progress: Progress) -> Result<()> { pub fn upgrade(index: &Index, progress: Progress) -> Result<()> {
let wtxn = index.env.write_txn()?; let wtxn = index.env.write_txn()?;
let from = index.get_version(&wtxn)?; let from = index.get_version(&wtxn)?;
let upgrade_functions = let upgrade_functions =
[(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")]; [(v1_12_to_v1_13 as fn(&Index, Progress) -> Result<()>, "Upgrading from v1.12 to v1.13")];
let current_major: u32 = VERSION_MAJOR.parse().unwrap();
let current_minor: u32 = VERSION_MINOR.parse().unwrap();
let current_patch: u32 = VERSION_PATCH.parse().unwrap();
let start = match from { let start = match from {
// If there was no version it means we're coming from the base version specified by the index-scheduler // If there was no version it means we're coming from the base version specified by the index-scheduler
None if base_version.0 == 1 && base_version.1 == 12 => 0, None | Some((1, 12, _)) => 0,
Some((1, 12, _)) => 0,
// --- Error handling
None => {
return Err(UserError::TooOldForUpgrade(
base_version.0,
base_version.1,
base_version.2,
)
.into());
}
Some((major, minor, patch)) if major == 0 || (major == 1 && minor < 12) => {
return Err(UserError::TooOldForUpgrade(major, minor, patch).into());
}
Some((major, minor, patch)) if major > current_major => {
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch)) if major == current_major && minor > current_minor => {
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch))
if major == current_major && minor == current_minor && patch > current_patch =>
{
return Err(UserError::CannotDowngrade(major, minor, patch).into());
}
Some((major, minor, patch)) => { Some((major, minor, patch)) => {
return Err(UserError::CannotUpgradeToUnknownVersion(major, minor, patch).into()) return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into())
} }
}; };