mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-09 22:25:44 +00:00
Switch to migration-oriented dumpless upgrade
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use anyhow::bail;
|
||||
use meilisearch_types::heed::{Env, RwTxn, WithoutTls};
|
||||
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
|
||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use meilisearch_types::versioning;
|
||||
use time::OffsetDateTime;
|
||||
use tracing::info;
|
||||
|
||||
@@ -9,83 +9,82 @@ use crate::queue::TaskQueue;
|
||||
use crate::versioning::Versioning;
|
||||
|
||||
trait UpgradeIndexScheduler {
|
||||
fn upgrade(
|
||||
&self,
|
||||
env: &Env<WithoutTls>,
|
||||
wtxn: &mut RwTxn,
|
||||
original: (u32, u32, u32),
|
||||
) -> anyhow::Result<()>;
|
||||
fn target_version(&self) -> (u32, u32, u32);
|
||||
fn upgrade(&self, env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> anyhow::Result<()>;
|
||||
/// Whether the migration should be applied, depending on the initial version of the index scheduler before
|
||||
/// any migration was applied
|
||||
fn must_upgrade(&self, initial_version: (u32, u32, u32)) -> bool;
|
||||
/// A progress-centric description of the migration
|
||||
fn description(&self) -> &'static str;
|
||||
}
|
||||
|
||||
/// Upgrade the index scheduler to the binary version.
|
||||
///
|
||||
/// # Warning
|
||||
///
|
||||
/// The current implementation uses a single wtxn to the index scheduler for the whole duration of the upgrade.
|
||||
/// If migrations start taking take a long time, it might prevent tasks from being registered.
|
||||
/// If this issue manifests, then it can be mitigated by adding a `fn target_version` to `UpgradeIndexScheduler`,
|
||||
/// to be able to write intermediate versions and drop the wtxn between applying migrations.
|
||||
pub fn upgrade_index_scheduler(
|
||||
env: &Env<WithoutTls>,
|
||||
versioning: &Versioning,
|
||||
from: (u32, u32, u32),
|
||||
to: (u32, u32, u32),
|
||||
initial_version: (u32, u32, u32),
|
||||
) -> anyhow::Result<()> {
|
||||
let current_major = to.0;
|
||||
let current_minor = to.1;
|
||||
let current_patch = to.2;
|
||||
let target_major: u32 = versioning::VERSION_MAJOR;
|
||||
let target_minor: u32 = versioning::VERSION_MINOR;
|
||||
let target_patch: u32 = versioning::VERSION_PATCH;
|
||||
let target_version = (target_major, target_minor, target_patch);
|
||||
|
||||
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[
|
||||
// This is the last upgrade function, it will be called when the index is up to date.
|
||||
// any other upgrade function should be added before this one.
|
||||
&ToCurrentNoOp {},
|
||||
];
|
||||
|
||||
let start = match from {
|
||||
(1, 12, _) => 0,
|
||||
(1, 13, _) => 0,
|
||||
(1, 14, _) => 0,
|
||||
(1, 15, _) => 0,
|
||||
(1, 16, _) => 0,
|
||||
(1, 17, _) => 0,
|
||||
(1, 18, _) => 0,
|
||||
(1, 19, _) => 0,
|
||||
(1, 20, _) => 0,
|
||||
(1, 21, _) => 0,
|
||||
(1, 22, _) => 0,
|
||||
(1, 23, _) => 0,
|
||||
(1, 24, _) => 0,
|
||||
(1, 25, _) => 0,
|
||||
(1, 26, _) => 0,
|
||||
(1, 27, _) => 0,
|
||||
(1, 28, _) => 0,
|
||||
(major, minor, patch) => {
|
||||
if major > current_major
|
||||
|| (major == current_major && minor > current_minor)
|
||||
|| (major == current_major && minor == current_minor && patch > current_patch)
|
||||
{
|
||||
bail!(
|
||||
"Database version {major}.{minor}.{patch} is higher than the Meilisearch version {current_major}.{current_minor}.{current_patch}. Downgrade is not supported",
|
||||
);
|
||||
} else if major < 1 || (major == current_major && minor < 12) {
|
||||
bail!(
|
||||
"Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{current_major}.{current_minor}.{current_patch}",
|
||||
);
|
||||
} else {
|
||||
bail!("Unknown database version: v{major}.{minor}.{patch}");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
info!("Upgrading the task queue");
|
||||
let mut local_from = from;
|
||||
for upgrade in upgrade_functions[start..].iter() {
|
||||
let target = upgrade.target_version();
|
||||
info!(
|
||||
"Upgrading from v{}.{}.{} to v{}.{}.{}",
|
||||
local_from.0, local_from.1, local_from.2, target.0, target.1, target.2
|
||||
);
|
||||
let mut wtxn = env.write_txn()?;
|
||||
upgrade.upgrade(env, &mut wtxn, local_from)?;
|
||||
versioning.set_version(&mut wtxn, target)?;
|
||||
wtxn.commit()?;
|
||||
local_from = target;
|
||||
if initial_version == target_version {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[
|
||||
// List all upgrade functions to apply in order here.
|
||||
];
|
||||
|
||||
let (initial_major, initial_minor, initial_patch) = initial_version;
|
||||
|
||||
if initial_version > target_version {
|
||||
bail!(
|
||||
"Database version {initial_major}.{initial_minor}.{initial_patch} is higher than the Meilisearch version {target_major}.{target_minor}.{target_patch}. Downgrade is not supported",
|
||||
);
|
||||
}
|
||||
|
||||
if initial_version < (1, 12, 0) {
|
||||
bail!(
|
||||
"Database version {initial_major}.{initial_minor}.{initial_patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{initial_major}.{initial_minor}.{initial_patch} and import it in the v{target_major}.{target_minor}.{target_patch}",
|
||||
);
|
||||
}
|
||||
|
||||
info!("Upgrading the task queue");
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let migration_count = upgrade_functions.len();
|
||||
for (migration_index, upgrade) in upgrade_functions.iter().enumerate() {
|
||||
if upgrade.must_upgrade(initial_version) {
|
||||
info!(
|
||||
"[{migration_index}/{migration_count}]Applying migration: {}",
|
||||
upgrade.description()
|
||||
);
|
||||
|
||||
upgrade.upgrade(env, &mut wtxn)?;
|
||||
|
||||
info!(
|
||||
"[{}/{migration_count}]Migration applied: {}",
|
||||
migration_index + 1,
|
||||
upgrade.description()
|
||||
)
|
||||
} else {
|
||||
info!(
|
||||
"[{migration_index}/{migration_count}]Skipping unnecessary migration: {}",
|
||||
upgrade.description()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
versioning.set_version(&mut wtxn, target_version)?;
|
||||
info!("Task queue upgraded, spawning the upgrade database task");
|
||||
|
||||
let queue = TaskQueue::new(env, &mut wtxn)?;
|
||||
let uid = queue.next_task_id(&wtxn)?;
|
||||
queue.register(
|
||||
@@ -98,9 +97,9 @@ pub fn upgrade_index_scheduler(
|
||||
finished_at: None,
|
||||
error: None,
|
||||
canceled_by: None,
|
||||
details: Some(Details::UpgradeDatabase { from, to }),
|
||||
details: Some(Details::UpgradeDatabase { from: initial_version, to: target_version }),
|
||||
status: Status::Enqueued,
|
||||
kind: KindWithContent::UpgradeDatabase { from },
|
||||
kind: KindWithContent::UpgradeDatabase { from: initial_version },
|
||||
network: None,
|
||||
custom_metadata: None,
|
||||
},
|
||||
@@ -109,21 +108,3 @@ pub fn upgrade_index_scheduler(
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
struct ToCurrentNoOp {}
|
||||
|
||||
impl UpgradeIndexScheduler for ToCurrentNoOp {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_env: &Env<WithoutTls>,
|
||||
_wtxn: &mut RwTxn,
|
||||
_original: (u32, u32, u32),
|
||||
) -> anyhow::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user