mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	Merge pull request #5523 from meilisearch/rollback-updates
Allow rollbacking updates
This commit is contained in:
		@@ -2,6 +2,7 @@ use std::fmt::Display;
 | 
			
		||||
 | 
			
		||||
use meilisearch_types::batches::BatchId;
 | 
			
		||||
use meilisearch_types::error::{Code, ErrorCode};
 | 
			
		||||
use meilisearch_types::milli::index::RollbackOutcome;
 | 
			
		||||
use meilisearch_types::tasks::{Kind, Status};
 | 
			
		||||
use meilisearch_types::{heed, milli};
 | 
			
		||||
use thiserror::Error;
 | 
			
		||||
@@ -150,8 +151,24 @@ pub enum Error {
 | 
			
		||||
    CorruptedTaskQueue,
 | 
			
		||||
    #[error(transparent)]
 | 
			
		||||
    DatabaseUpgrade(Box<Self>),
 | 
			
		||||
    #[error("Failed to rollback for index `{index}`: {rollback_outcome} ")]
 | 
			
		||||
    RollbackFailed { index: String, rollback_outcome: RollbackOutcome },
 | 
			
		||||
    #[error(transparent)]
 | 
			
		||||
    UnrecoverableError(Box<Self>),
 | 
			
		||||
    #[error("The index scheduler is in version v{}.{}.{}, but Meilisearch is in version v{}.{}.{}.\n  - hint: start the correct version of Meilisearch, or consider updating your database. See also <https://www.meilisearch.com/docs/learn/update_and_migration/updating>",
 | 
			
		||||
    index_scheduler_version.0, index_scheduler_version.1, index_scheduler_version.2,
 | 
			
		||||
    package_version.0, package_version.1, package_version.2)]
 | 
			
		||||
    IndexSchedulerVersionMismatch {
 | 
			
		||||
        index_scheduler_version: (u32, u32, u32),
 | 
			
		||||
        package_version: (u32, u32, u32),
 | 
			
		||||
    },
 | 
			
		||||
    #[error("Index `{index}` is in version v{}.{}.{}, but Meilisearch is in version v{}.{}.{}.\n  - note: this is an internal error, please consider filing a bug report: <https://github.com/meilisearch/meilisearch/issues/new?template=bug_report.md>",
 | 
			
		||||
    index_version.0, index_version.1, index_version.2, package_version.0, package_version.1, package_version.2)]
 | 
			
		||||
    IndexVersionMismatch {
 | 
			
		||||
        index: String,
 | 
			
		||||
        index_version: (u32, u32, u32),
 | 
			
		||||
        package_version: (u32, u32, u32),
 | 
			
		||||
    },
 | 
			
		||||
    #[error(transparent)]
 | 
			
		||||
    HeedTransaction(heed::Error),
 | 
			
		||||
 | 
			
		||||
@@ -209,6 +226,9 @@ impl Error {
 | 
			
		||||
            | Error::CorruptedTaskQueue
 | 
			
		||||
            | Error::DatabaseUpgrade(_)
 | 
			
		||||
            | Error::UnrecoverableError(_)
 | 
			
		||||
            | Error::IndexSchedulerVersionMismatch { .. }
 | 
			
		||||
            | Error::IndexVersionMismatch { .. }
 | 
			
		||||
            | Error::RollbackFailed { .. }
 | 
			
		||||
            | Error::HeedTransaction(_) => false,
 | 
			
		||||
            #[cfg(test)]
 | 
			
		||||
            Error::PlannedFailure => false,
 | 
			
		||||
@@ -274,7 +294,10 @@ impl ErrorCode for Error {
 | 
			
		||||
            Error::CorruptedTaskQueue => Code::Internal,
 | 
			
		||||
            Error::CorruptedDump => Code::Internal,
 | 
			
		||||
            Error::DatabaseUpgrade(_) => Code::Internal,
 | 
			
		||||
            Error::RollbackFailed { .. } => Code::Internal,
 | 
			
		||||
            Error::UnrecoverableError(_) => Code::Internal,
 | 
			
		||||
            Error::IndexSchedulerVersionMismatch { .. } => Code::Internal,
 | 
			
		||||
            Error::IndexVersionMismatch { .. } => Code::Internal,
 | 
			
		||||
            Error::CreateBatch(_) => Code::Internal,
 | 
			
		||||
 | 
			
		||||
            // This one should never be seen by the end user
 | 
			
		||||
 
 | 
			
		||||
@@ -7,6 +7,7 @@ use meilisearch_types::heed::types::{SerdeJson, Str};
 | 
			
		||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
 | 
			
		||||
use meilisearch_types::milli;
 | 
			
		||||
use meilisearch_types::milli::database_stats::DatabaseStats;
 | 
			
		||||
use meilisearch_types::milli::index::RollbackOutcome;
 | 
			
		||||
use meilisearch_types::milli::update::IndexerConfig;
 | 
			
		||||
use meilisearch_types::milli::{FieldDistribution, Index};
 | 
			
		||||
use serde::{Deserialize, Serialize};
 | 
			
		||||
@@ -431,6 +432,51 @@ impl IndexMapper {
 | 
			
		||||
        Ok(index)
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn rollback_index(
 | 
			
		||||
        &self,
 | 
			
		||||
        rtxn: &RoTxn,
 | 
			
		||||
        name: &str,
 | 
			
		||||
        to: (u32, u32, u32),
 | 
			
		||||
    ) -> Result<RollbackOutcome> {
 | 
			
		||||
        // remove any currently updating index to make sure that we aren't keeping a reference to the index somewhere
 | 
			
		||||
        drop(self.currently_updating_index.write().unwrap().take());
 | 
			
		||||
 | 
			
		||||
        let uuid = self
 | 
			
		||||
            .index_mapping
 | 
			
		||||
            .get(rtxn, name)?
 | 
			
		||||
            .ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
 | 
			
		||||
 | 
			
		||||
        // take the lock to make sure noone is messing with the indexes while we rollback
 | 
			
		||||
        // this will block any search or other operation, but we are rollbacking so this is probably acceptable.
 | 
			
		||||
        let mut index_map = self.index_map.write().unwrap();
 | 
			
		||||
 | 
			
		||||
        'close_index: loop {
 | 
			
		||||
            match index_map.get(&uuid) {
 | 
			
		||||
                Available(_) => {
 | 
			
		||||
                    index_map.close_for_resize(&uuid, self.enable_mdb_writemap, 0);
 | 
			
		||||
                    // index should now be `Closing`; try again
 | 
			
		||||
                    continue;
 | 
			
		||||
                }
 | 
			
		||||
                // index already closed
 | 
			
		||||
                Missing => break 'close_index,
 | 
			
		||||
                // closing requested by this thread or another one; wait for closing to complete, then exit
 | 
			
		||||
                Closing(closing_index) => {
 | 
			
		||||
                    if closing_index.wait_timeout(Duration::from_secs(100)).is_none() {
 | 
			
		||||
                        // release the lock so it doesn't get poisoned
 | 
			
		||||
                        drop(index_map);
 | 
			
		||||
                        panic!("cannot close index")
 | 
			
		||||
                    }
 | 
			
		||||
                    break;
 | 
			
		||||
                }
 | 
			
		||||
                BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
 | 
			
		||||
            };
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        let index_path = self.base_path.join(uuid.to_string());
 | 
			
		||||
        Index::rollback(milli::heed::EnvOpenOptions::new().read_txn_without_tls(), index_path, to)
 | 
			
		||||
            .map_err(|err| crate::Error::from_milli(err, Some(name.to_string())))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Attempts `f` for each index that exists in the index mapper.
 | 
			
		||||
    ///
 | 
			
		||||
    /// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened,
 | 
			
		||||
 
 | 
			
		||||
@@ -41,11 +41,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
 | 
			
		||||
    let mut snap = String::new();
 | 
			
		||||
 | 
			
		||||
    let indx_sched_version = version.get_version(&rtxn).unwrap();
 | 
			
		||||
    let latest_version = (
 | 
			
		||||
        versioning::VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
        versioning::VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
        versioning::VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
    );
 | 
			
		||||
    let latest_version =
 | 
			
		||||
        (versioning::VERSION_MAJOR, versioning::VERSION_MINOR, versioning::VERSION_PATCH);
 | 
			
		||||
    if indx_sched_version != Some(latest_version) {
 | 
			
		||||
        snap.push_str(&format!("index scheduler running on version {indx_sched_version:?}\n"));
 | 
			
		||||
    }
 | 
			
		||||
 
 | 
			
		||||
@@ -398,9 +398,9 @@ impl IndexScheduler {
 | 
			
		||||
                        Ok(Ok(TickOutcome::StopProcessingForever)) => break,
 | 
			
		||||
                        Ok(Err(e)) => {
 | 
			
		||||
                            tracing::error!("{e}");
 | 
			
		||||
                            // Wait one second when an irrecoverable error occurs.
 | 
			
		||||
                            // Wait when an irrecoverable error occurs.
 | 
			
		||||
                            if !e.is_recoverable() {
 | 
			
		||||
                                std::thread::sleep(Duration::from_secs(1));
 | 
			
		||||
                                std::thread::sleep(Duration::from_secs(10));
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        Err(_panic) => {
 | 
			
		||||
 
 | 
			
		||||
@@ -74,6 +74,7 @@ make_enum_progress! {
 | 
			
		||||
make_enum_progress! {
 | 
			
		||||
    pub enum TaskCancelationProgress {
 | 
			
		||||
        RetrievingTasks,
 | 
			
		||||
        CancelingUpgrade,
 | 
			
		||||
        UpdatingTasks,
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -423,7 +423,8 @@ impl IndexScheduler {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /// Create the next batch to be processed;
 | 
			
		||||
    /// 1. We get the *last* task to cancel.
 | 
			
		||||
    /// 0. We get the *last* task to cancel.
 | 
			
		||||
    /// 1. We get the tasks to upgrade.
 | 
			
		||||
    /// 2. We get the *next* task to delete.
 | 
			
		||||
    /// 3. We get the *next* snapshot to process.
 | 
			
		||||
    /// 4. We get the *next* dump to process.
 | 
			
		||||
@@ -443,7 +444,20 @@ impl IndexScheduler {
 | 
			
		||||
        let count_total_enqueued = enqueued.len();
 | 
			
		||||
        let failed = &self.queue.tasks.get_status(rtxn, Status::Failed)?;
 | 
			
		||||
 | 
			
		||||
        // 0. The priority over everything is to upgrade the instance
 | 
			
		||||
        // 0. we get the last task to cancel.
 | 
			
		||||
        let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
 | 
			
		||||
        if let Some(task_id) = to_cancel.max() {
 | 
			
		||||
            let mut task =
 | 
			
		||||
                self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
 | 
			
		||||
            current_batch.processing(Some(&mut task));
 | 
			
		||||
            current_batch.reason(BatchStopReason::TaskCannotBeBatched {
 | 
			
		||||
                kind: Kind::TaskCancelation,
 | 
			
		||||
                id: task_id,
 | 
			
		||||
            });
 | 
			
		||||
            return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 1. We upgrade the instance
 | 
			
		||||
        // There shouldn't be multiple upgrade tasks but just in case we're going to batch all of them at the same time
 | 
			
		||||
        let upgrade = self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)? & (enqueued | failed);
 | 
			
		||||
        if !upgrade.is_empty() {
 | 
			
		||||
@@ -459,17 +473,21 @@ impl IndexScheduler {
 | 
			
		||||
            return Ok(Some((Batch::UpgradeDatabase { tasks }, current_batch)));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 1. we get the last task to cancel.
 | 
			
		||||
        let to_cancel = self.queue.tasks.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
 | 
			
		||||
        if let Some(task_id) = to_cancel.max() {
 | 
			
		||||
            let mut task =
 | 
			
		||||
                self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
 | 
			
		||||
            current_batch.processing(Some(&mut task));
 | 
			
		||||
            current_batch.reason(BatchStopReason::TaskCannotBeBatched {
 | 
			
		||||
                kind: Kind::TaskCancelation,
 | 
			
		||||
                id: task_id,
 | 
			
		||||
            });
 | 
			
		||||
            return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
 | 
			
		||||
        // check the version of the scheduler here.
 | 
			
		||||
        // if the version is not the current, refuse to batch any additional task.
 | 
			
		||||
        let version = self.version.get_version(rtxn)?;
 | 
			
		||||
        let package_version = (
 | 
			
		||||
            meilisearch_types::versioning::VERSION_MAJOR,
 | 
			
		||||
            meilisearch_types::versioning::VERSION_MINOR,
 | 
			
		||||
            meilisearch_types::versioning::VERSION_PATCH,
 | 
			
		||||
        );
 | 
			
		||||
        if version != Some(package_version) {
 | 
			
		||||
            return Err(Error::UnrecoverableError(Box::new(
 | 
			
		||||
                Error::IndexSchedulerVersionMismatch {
 | 
			
		||||
                    index_scheduler_version: version.unwrap_or((1, 12, 0)),
 | 
			
		||||
                    package_version,
 | 
			
		||||
                },
 | 
			
		||||
            )));
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 2. we get the next task to delete
 | 
			
		||||
 
 | 
			
		||||
@@ -6,7 +6,8 @@ use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
 | 
			
		||||
use meilisearch_types::heed::{RoTxn, RwTxn};
 | 
			
		||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
 | 
			
		||||
use meilisearch_types::milli::{self, ChannelCongestion};
 | 
			
		||||
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task};
 | 
			
		||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
 | 
			
		||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
 | 
			
		||||
use milli::update::Settings as MilliSettings;
 | 
			
		||||
use roaring::RoaringBitmap;
 | 
			
		||||
 | 
			
		||||
@@ -144,11 +145,22 @@ impl IndexScheduler {
 | 
			
		||||
                    self.index_mapper.index(&rtxn, &index_uid)?
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let mut index_wtxn = index.write_txn()?;
 | 
			
		||||
 | 
			
		||||
                let index_version = index.get_version(&index_wtxn)?.unwrap_or((1, 12, 0));
 | 
			
		||||
                let package_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH);
 | 
			
		||||
                if index_version != package_version {
 | 
			
		||||
                    return Err(Error::IndexVersionMismatch {
 | 
			
		||||
                        index: index_uid,
 | 
			
		||||
                        index_version,
 | 
			
		||||
                        package_version,
 | 
			
		||||
                    });
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                // the index operation can take a long time, so save this handle to make it available to the search for the duration of the tick
 | 
			
		||||
                self.index_mapper
 | 
			
		||||
                    .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
 | 
			
		||||
 | 
			
		||||
                let mut index_wtxn = index.write_txn()?;
 | 
			
		||||
                let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
 | 
			
		||||
                let (tasks, congestion) =
 | 
			
		||||
                    self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?;
 | 
			
		||||
@@ -353,9 +365,11 @@ impl IndexScheduler {
 | 
			
		||||
                let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
 | 
			
		||||
                    unreachable!();
 | 
			
		||||
                };
 | 
			
		||||
 | 
			
		||||
                let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(from, progress)));
 | 
			
		||||
                match ret {
 | 
			
		||||
                    Ok(Ok(())) => (),
 | 
			
		||||
                    Ok(Err(Error::AbortedTask)) => return Err(Error::AbortedTask),
 | 
			
		||||
                    Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))),
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        let msg = match e.downcast_ref::<&'static str>() {
 | 
			
		||||
@@ -653,17 +667,79 @@ impl IndexScheduler {
 | 
			
		||||
        progress: &Progress,
 | 
			
		||||
    ) -> Result<Vec<Task>> {
 | 
			
		||||
        progress.update_progress(TaskCancelationProgress::RetrievingTasks);
 | 
			
		||||
        let mut tasks_to_cancel = RoaringBitmap::new();
 | 
			
		||||
 | 
			
		||||
        let enqueued_tasks = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
 | 
			
		||||
 | 
			
		||||
        // 0. Check if any upgrade task was matched.
 | 
			
		||||
        //    If so, we cancel all the failed or enqueued upgrade tasks.
 | 
			
		||||
        let upgrade_tasks = &self.queue.tasks.get_kind(rtxn, Kind::UpgradeDatabase)?;
 | 
			
		||||
        let is_canceling_upgrade = !matched_tasks.is_disjoint(upgrade_tasks);
 | 
			
		||||
        if is_canceling_upgrade {
 | 
			
		||||
            let failed_tasks = self.queue.tasks.get_status(rtxn, Status::Failed)?;
 | 
			
		||||
            tasks_to_cancel |= upgrade_tasks & (enqueued_tasks | failed_tasks);
 | 
			
		||||
        }
 | 
			
		||||
        // 1. Remove from this list the tasks that we are not allowed to cancel
 | 
			
		||||
        //    Notice that only the _enqueued_ ones are cancelable and we should
 | 
			
		||||
        //    have already aborted the indexation of the _processing_ ones
 | 
			
		||||
        let cancelable_tasks = self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
 | 
			
		||||
        let tasks_to_cancel = cancelable_tasks & matched_tasks;
 | 
			
		||||
        tasks_to_cancel |= enqueued_tasks & matched_tasks;
 | 
			
		||||
 | 
			
		||||
        // 2. If we're canceling an upgrade, attempt the rollback
 | 
			
		||||
        if let Some(latest_upgrade_task) = (&tasks_to_cancel & upgrade_tasks).max() {
 | 
			
		||||
            progress.update_progress(TaskCancelationProgress::CancelingUpgrade);
 | 
			
		||||
 | 
			
		||||
            let task = self.queue.tasks.get_task(rtxn, latest_upgrade_task)?.unwrap();
 | 
			
		||||
            let Some(Details::UpgradeDatabase { from, to }) = task.details else {
 | 
			
		||||
                unreachable!("wrong details for upgrade task {latest_upgrade_task}")
 | 
			
		||||
            };
 | 
			
		||||
 | 
			
		||||
            // check that we are rollbacking an upgrade to the current Meilisearch
 | 
			
		||||
            let bin_major: u32 = meilisearch_types::versioning::VERSION_MAJOR;
 | 
			
		||||
            let bin_minor: u32 = meilisearch_types::versioning::VERSION_MINOR;
 | 
			
		||||
            let bin_patch: u32 = meilisearch_types::versioning::VERSION_PATCH;
 | 
			
		||||
 | 
			
		||||
            if to == (bin_major, bin_minor, bin_patch) {
 | 
			
		||||
                tracing::warn!(
 | 
			
		||||
                    "Rollbacking from v{}.{}.{} to v{}.{}.{}",
 | 
			
		||||
                    to.0,
 | 
			
		||||
                    to.1,
 | 
			
		||||
                    to.2,
 | 
			
		||||
                    from.0,
 | 
			
		||||
                    from.1,
 | 
			
		||||
                    from.2
 | 
			
		||||
                );
 | 
			
		||||
                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
 | 
			
		||||
                    self.process_rollback(from, progress)
 | 
			
		||||
                })) {
 | 
			
		||||
                    Ok(Ok(())) => {}
 | 
			
		||||
                    Ok(Err(err)) => return Err(Error::DatabaseUpgrade(Box::new(err))),
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        let msg = match e.downcast_ref::<&'static str>() {
 | 
			
		||||
                            Some(s) => *s,
 | 
			
		||||
                            None => match e.downcast_ref::<String>() {
 | 
			
		||||
                                Some(s) => &s[..],
 | 
			
		||||
                                None => "Box<dyn Any>",
 | 
			
		||||
                            },
 | 
			
		||||
                        };
 | 
			
		||||
                        return Err(Error::DatabaseUpgrade(Box::new(Error::ProcessBatchPanicked(
 | 
			
		||||
                            msg.to_string(),
 | 
			
		||||
                        ))));
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            } else {
 | 
			
		||||
                tracing::debug!(
 | 
			
		||||
                    "Not rollbacking an upgrade targetting the earlier version v{}.{}.{}",
 | 
			
		||||
                    bin_major,
 | 
			
		||||
                    bin_minor,
 | 
			
		||||
                    bin_patch
 | 
			
		||||
                )
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // 3. We now have a list of tasks to cancel, cancel them
 | 
			
		||||
        let (task_progress, progress_obj) = AtomicTaskStep::new(tasks_to_cancel.len() as u32);
 | 
			
		||||
        progress.update_progress(progress_obj);
 | 
			
		||||
 | 
			
		||||
        // 2. We now have a list of tasks to cancel, cancel them
 | 
			
		||||
        let mut tasks = self.queue.tasks.get_existing_tasks(
 | 
			
		||||
            rtxn,
 | 
			
		||||
            tasks_to_cancel.iter().inspect(|_| {
 | 
			
		||||
 
 | 
			
		||||
@@ -12,10 +12,14 @@ impl IndexScheduler {
 | 
			
		||||
        #[cfg(test)]
 | 
			
		||||
        self.maybe_fail(crate::test_utils::FailureLocation::ProcessUpgrade)?;
 | 
			
		||||
 | 
			
		||||
        enum UpgradeIndex {}
 | 
			
		||||
        let indexes = self.index_names()?;
 | 
			
		||||
 | 
			
		||||
        for (i, uid) in indexes.iter().enumerate() {
 | 
			
		||||
            let must_stop_processing = self.scheduler.must_stop_processing.clone();
 | 
			
		||||
 | 
			
		||||
            if must_stop_processing.get() {
 | 
			
		||||
                return Err(Error::AbortedTask);
 | 
			
		||||
            }
 | 
			
		||||
            progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
 | 
			
		||||
                format!("Upgrading index `{uid}`"),
 | 
			
		||||
                i as u32,
 | 
			
		||||
@@ -27,6 +31,7 @@ impl IndexScheduler {
 | 
			
		||||
                &mut index_wtxn,
 | 
			
		||||
                &index,
 | 
			
		||||
                db_version,
 | 
			
		||||
                || must_stop_processing.get(),
 | 
			
		||||
                progress.clone(),
 | 
			
		||||
            )
 | 
			
		||||
            .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
 | 
			
		||||
@@ -46,4 +51,42 @@ impl IndexScheduler {
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn process_rollback(&self, db_version: (u32, u32, u32), progress: &Progress) -> Result<()> {
 | 
			
		||||
        let mut wtxn = self.env.write_txn()?;
 | 
			
		||||
        tracing::info!(?db_version, "roll back index scheduler version");
 | 
			
		||||
        self.version.set_version(&mut wtxn, db_version)?;
 | 
			
		||||
        let db_path = self.scheduler.version_file_path.parent().unwrap();
 | 
			
		||||
        wtxn.commit()?;
 | 
			
		||||
 | 
			
		||||
        let indexes = self.index_names()?;
 | 
			
		||||
 | 
			
		||||
        tracing::info!("roll backing all indexes");
 | 
			
		||||
        for (i, uid) in indexes.iter().enumerate() {
 | 
			
		||||
            progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
 | 
			
		||||
                format!("Rollbacking index `{uid}`"),
 | 
			
		||||
                i as u32,
 | 
			
		||||
                indexes.len() as u32,
 | 
			
		||||
            ));
 | 
			
		||||
            let index_schd_rtxn = self.env.read_txn()?;
 | 
			
		||||
 | 
			
		||||
            let rollback_outcome =
 | 
			
		||||
                self.index_mapper.rollback_index(&index_schd_rtxn, uid, db_version)?;
 | 
			
		||||
            if !rollback_outcome.succeeded() {
 | 
			
		||||
                return Err(crate::Error::RollbackFailed { index: uid.clone(), rollback_outcome });
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        tracing::info!(?db_path, ?db_version, "roll back version file");
 | 
			
		||||
        meilisearch_types::versioning::create_version_file(
 | 
			
		||||
            db_path,
 | 
			
		||||
            db_version.0,
 | 
			
		||||
            db_version.1,
 | 
			
		||||
            db_version.2,
 | 
			
		||||
        )?;
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
enum UpgradeIndex {}
 | 
			
		||||
 
 | 
			
		||||
@@ -114,12 +114,8 @@ impl IndexScheduler {
 | 
			
		||||
            auto_upgrade: true, // Don't cost much and will ensure the happy path works
 | 
			
		||||
            embedding_cache_cap: 10,
 | 
			
		||||
        };
 | 
			
		||||
        let version = configuration(&mut options).unwrap_or_else(|| {
 | 
			
		||||
            (
 | 
			
		||||
                versioning::VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
                versioning::VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
                versioning::VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
            )
 | 
			
		||||
        let version = configuration(&mut options).unwrap_or({
 | 
			
		||||
            (versioning::VERSION_MAJOR, versioning::VERSION_MINOR, versioning::VERSION_PATCH)
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
        std::fs::create_dir_all(&options.auth_path).unwrap();
 | 
			
		||||
 
 | 
			
		||||
@@ -104,10 +104,6 @@ impl UpgradeIndexScheduler for ToCurrentNoOp {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn target_version(&self) -> (u32, u32, u32) {
 | 
			
		||||
        (
 | 
			
		||||
            VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
            VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
            VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
        )
 | 
			
		||||
        (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -39,9 +39,9 @@ impl Versioning {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<(), heed::Error> {
 | 
			
		||||
        let major = versioning::VERSION_MAJOR.parse().unwrap();
 | 
			
		||||
        let minor = versioning::VERSION_MINOR.parse().unwrap();
 | 
			
		||||
        let patch = versioning::VERSION_PATCH.parse().unwrap();
 | 
			
		||||
        let major = versioning::VERSION_MAJOR;
 | 
			
		||||
        let minor = versioning::VERSION_MINOR;
 | 
			
		||||
        let patch = versioning::VERSION_PATCH;
 | 
			
		||||
        self.set_version(wtxn, (major, minor, patch))
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -64,9 +64,9 @@ impl Versioning {
 | 
			
		||||
        };
 | 
			
		||||
        wtxn.commit()?;
 | 
			
		||||
 | 
			
		||||
        let bin_major: u32 = versioning::VERSION_MAJOR.parse().unwrap();
 | 
			
		||||
        let bin_minor: u32 = versioning::VERSION_MINOR.parse().unwrap();
 | 
			
		||||
        let bin_patch: u32 = versioning::VERSION_PATCH.parse().unwrap();
 | 
			
		||||
        let bin_major: u32 = versioning::VERSION_MAJOR;
 | 
			
		||||
        let bin_minor: u32 = versioning::VERSION_MINOR;
 | 
			
		||||
        let bin_patch: u32 = versioning::VERSION_PATCH;
 | 
			
		||||
        let to = (bin_major, bin_minor, bin_patch);
 | 
			
		||||
 | 
			
		||||
        if from != to {
 | 
			
		||||
 
 | 
			
		||||
@@ -272,9 +272,9 @@ impl KindWithContent {
 | 
			
		||||
            KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
 | 
			
		||||
                from: (from.0, from.1, from.2),
 | 
			
		||||
                to: (
 | 
			
		||||
                    versioning::VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MAJOR,
 | 
			
		||||
                    versioning::VERSION_MINOR,
 | 
			
		||||
                    versioning::VERSION_PATCH,
 | 
			
		||||
                ),
 | 
			
		||||
            }),
 | 
			
		||||
        }
 | 
			
		||||
@@ -338,9 +338,9 @@ impl KindWithContent {
 | 
			
		||||
            KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
 | 
			
		||||
                from: *from,
 | 
			
		||||
                to: (
 | 
			
		||||
                    versioning::VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MAJOR,
 | 
			
		||||
                    versioning::VERSION_MINOR,
 | 
			
		||||
                    versioning::VERSION_PATCH,
 | 
			
		||||
                ),
 | 
			
		||||
            }),
 | 
			
		||||
        }
 | 
			
		||||
@@ -386,9 +386,9 @@ impl From<&KindWithContent> for Option<Details> {
 | 
			
		||||
            KindWithContent::UpgradeDatabase { from } => Some(Details::UpgradeDatabase {
 | 
			
		||||
                from: *from,
 | 
			
		||||
                to: (
 | 
			
		||||
                    versioning::VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
                    versioning::VERSION_MAJOR,
 | 
			
		||||
                    versioning::VERSION_MINOR,
 | 
			
		||||
                    versioning::VERSION_PATCH,
 | 
			
		||||
                ),
 | 
			
		||||
            }),
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -8,9 +8,7 @@ use tempfile::NamedTempFile;
 | 
			
		||||
/// The name of the file that contains the version of the database.
 | 
			
		||||
pub const VERSION_FILE_NAME: &str = "VERSION";
 | 
			
		||||
 | 
			
		||||
pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
 | 
			
		||||
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
 | 
			
		||||
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
 | 
			
		||||
pub use milli::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
 | 
			
		||||
 | 
			
		||||
/// Persists the version of the current Meilisearch binary to a VERSION file
 | 
			
		||||
pub fn create_current_version_file(db_path: &Path) -> anyhow::Result<()> {
 | 
			
		||||
@@ -19,9 +17,9 @@ pub fn create_current_version_file(db_path: &Path) -> anyhow::Result<()> {
 | 
			
		||||
 | 
			
		||||
pub fn create_version_file(
 | 
			
		||||
    db_path: &Path,
 | 
			
		||||
    major: &str,
 | 
			
		||||
    minor: &str,
 | 
			
		||||
    patch: &str,
 | 
			
		||||
    major: u32,
 | 
			
		||||
    minor: u32,
 | 
			
		||||
    patch: u32,
 | 
			
		||||
) -> anyhow::Result<()> {
 | 
			
		||||
    let version_path = db_path.join(VERSION_FILE_NAME);
 | 
			
		||||
    // In order to persist the file later we must create it in the `data.ms` and not in `/tmp`
 | 
			
		||||
 
 | 
			
		||||
@@ -235,10 +235,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
 | 
			
		||||
        auto_upgrade: opt.experimental_dumpless_upgrade,
 | 
			
		||||
        embedding_cache_cap: opt.experimental_embedding_cache_entries,
 | 
			
		||||
    };
 | 
			
		||||
    let bin_major: u32 = VERSION_MAJOR.parse().unwrap();
 | 
			
		||||
    let bin_minor: u32 = VERSION_MINOR.parse().unwrap();
 | 
			
		||||
    let bin_patch: u32 = VERSION_PATCH.parse().unwrap();
 | 
			
		||||
    let binary_version = (bin_major, bin_minor, bin_patch);
 | 
			
		||||
    let binary_version = (VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH);
 | 
			
		||||
 | 
			
		||||
    let empty_db = is_empty_db(&opt.db_path);
 | 
			
		||||
    let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot {
 | 
			
		||||
 
 | 
			
		||||
@@ -54,7 +54,7 @@ async fn version_requires_downgrade() {
 | 
			
		||||
    std::fs::create_dir_all(&db_path).unwrap();
 | 
			
		||||
    let major = meilisearch_types::versioning::VERSION_MAJOR;
 | 
			
		||||
    let minor = meilisearch_types::versioning::VERSION_MINOR;
 | 
			
		||||
    let patch = meilisearch_types::versioning::VERSION_PATCH.parse::<u32>().unwrap() + 1;
 | 
			
		||||
    let patch = meilisearch_types::versioning::VERSION_PATCH + 1;
 | 
			
		||||
    std::fs::write(db_path.join("VERSION"), format!("{major}.{minor}.{patch}")).unwrap();
 | 
			
		||||
    let options = Opt { experimental_dumpless_upgrade: true, ..default_settings };
 | 
			
		||||
    let err = Server::new_with_options(options).await.map(|_| ()).unwrap_err();
 | 
			
		||||
 
 | 
			
		||||
@@ -49,15 +49,10 @@ impl OfflineUpgrade {
 | 
			
		||||
        const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.7";
 | 
			
		||||
 | 
			
		||||
        let upgrade_list = [
 | 
			
		||||
            (
 | 
			
		||||
                v1_9_to_v1_10 as fn(&Path, u32, u32, u32) -> Result<(), anyhow::Error>,
 | 
			
		||||
                "1",
 | 
			
		||||
                "10",
 | 
			
		||||
                "0",
 | 
			
		||||
            ),
 | 
			
		||||
            (v1_10_to_v1_11, "1", "11", "0"),
 | 
			
		||||
            (v1_11_to_v1_12, "1", "12", "0"),
 | 
			
		||||
            (v1_12_to_v1_12_3, "1", "12", "3"),
 | 
			
		||||
            (v1_9_to_v1_10 as fn(&Path, u32, u32, u32) -> Result<(), anyhow::Error>, 1, 10, 0),
 | 
			
		||||
            (v1_10_to_v1_11, 1, 11, 0),
 | 
			
		||||
            (v1_11_to_v1_12, 1, 12, 0),
 | 
			
		||||
            (v1_12_to_v1_12_3, 1, 12, 3),
 | 
			
		||||
        ];
 | 
			
		||||
 | 
			
		||||
        let no_upgrade: usize = upgrade_list.len();
 | 
			
		||||
@@ -95,13 +90,8 @@ impl OfflineUpgrade {
 | 
			
		||||
 | 
			
		||||
        if start_at == no_upgrade {
 | 
			
		||||
            println!("No upgrade operation to perform, writing VERSION file");
 | 
			
		||||
            create_version_file(
 | 
			
		||||
                &self.db_path,
 | 
			
		||||
                &target_major.to_string(),
 | 
			
		||||
                &target_minor.to_string(),
 | 
			
		||||
                &target_patch.to_string(),
 | 
			
		||||
            )
 | 
			
		||||
            .context("while writing VERSION file after the upgrade")?;
 | 
			
		||||
            create_version_file(&self.db_path, target_major, target_minor, target_patch)
 | 
			
		||||
                .context("while writing VERSION file after the upgrade")?;
 | 
			
		||||
            println!("Success");
 | 
			
		||||
            return Ok(());
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,13 @@
 | 
			
		||||
pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
 | 
			
		||||
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
 | 
			
		||||
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
 | 
			
		||||
pub const VERSION_MAJOR: u32 = parse_u32(env!("CARGO_PKG_VERSION_MAJOR"));
 | 
			
		||||
pub const VERSION_MINOR: u32 = parse_u32(env!("CARGO_PKG_VERSION_MINOR"));
 | 
			
		||||
pub const VERSION_PATCH: u32 = parse_u32(env!("CARGO_PKG_VERSION_PATCH"));
 | 
			
		||||
 | 
			
		||||
const fn parse_u32(s: &str) -> u32 {
 | 
			
		||||
    match u32::from_str_radix(s, 10) {
 | 
			
		||||
        Ok(version) => version,
 | 
			
		||||
        Err(_) => panic!("could not parse as u32"),
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub const RESERVED_VECTORS_FIELD_NAME: &str = "_vectors";
 | 
			
		||||
pub const RESERVED_GEO_FIELD_NAME: &str = "_geo";
 | 
			
		||||
 
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -1,4 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
source: milli/src/index.rs
 | 
			
		||||
---
 | 
			
		||||
[0, ]
 | 
			
		||||
@@ -1,4 +0,0 @@
 | 
			
		||||
---
 | 
			
		||||
source: milli/src/index.rs
 | 
			
		||||
---
 | 
			
		||||
[]
 | 
			
		||||
@@ -0,0 +1,4 @@
 | 
			
		||||
---
 | 
			
		||||
source: crates/milli/src/test_index.rs
 | 
			
		||||
---
 | 
			
		||||
[0, ]
 | 
			
		||||
@@ -0,0 +1,4 @@
 | 
			
		||||
---
 | 
			
		||||
source: crates/milli/src/test_index.rs
 | 
			
		||||
---
 | 
			
		||||
[]
 | 
			
		||||
							
								
								
									
										1399
									
								
								crates/milli/src/test_index.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1399
									
								
								crates/milli/src/test_index.rs
									
									
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							@@ -24,12 +24,16 @@ trait UpgradeIndex {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// Return true if the cached stats of the index must be regenerated
 | 
			
		||||
pub fn upgrade(
 | 
			
		||||
pub fn upgrade<MSP>(
 | 
			
		||||
    wtxn: &mut RwTxn,
 | 
			
		||||
    index: &Index,
 | 
			
		||||
    db_version: (u32, u32, u32),
 | 
			
		||||
    must_stop_processing: MSP,
 | 
			
		||||
    progress: Progress,
 | 
			
		||||
) -> Result<bool> {
 | 
			
		||||
) -> Result<bool>
 | 
			
		||||
where
 | 
			
		||||
    MSP: Fn() -> bool + Sync,
 | 
			
		||||
{
 | 
			
		||||
    let from = index.get_version(wtxn)?.unwrap_or(db_version);
 | 
			
		||||
    let upgrade_functions: &[&dyn UpgradeIndex] = &[
 | 
			
		||||
        &V1_12_To_V1_12_3 {},
 | 
			
		||||
@@ -59,6 +63,9 @@ pub fn upgrade(
 | 
			
		||||
    let mut current_version = from;
 | 
			
		||||
    let mut regenerate_stats = false;
 | 
			
		||||
    for (i, upgrade) in upgrade_path.iter().enumerate() {
 | 
			
		||||
        if (must_stop_processing)() {
 | 
			
		||||
            return Err(crate::Error::InternalError(InternalError::AbortedIndexation));
 | 
			
		||||
        }
 | 
			
		||||
        let target = upgrade.target_version();
 | 
			
		||||
        progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
 | 
			
		||||
            format!(
 | 
			
		||||
 
 | 
			
		||||
@@ -1,7 +1,6 @@
 | 
			
		||||
use heed::RwTxn;
 | 
			
		||||
 | 
			
		||||
use super::UpgradeIndex;
 | 
			
		||||
use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
 | 
			
		||||
use crate::database_stats::DatabaseStats;
 | 
			
		||||
use crate::progress::Progress;
 | 
			
		||||
use crate::{make_enum_progress, Index, Result};
 | 
			
		||||
@@ -51,10 +50,6 @@ impl UpgradeIndex for V1_13_1_To_Latest_V1_13 {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fn target_version(&self) -> (u32, u32, u32) {
 | 
			
		||||
        (
 | 
			
		||||
            VERSION_MAJOR.parse().unwrap(),
 | 
			
		||||
            VERSION_MINOR.parse().unwrap(),
 | 
			
		||||
            VERSION_PATCH.parse().unwrap(),
 | 
			
		||||
        )
 | 
			
		||||
        (1, 13, 3)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user