make an upgrade module where we'll be able to shove each version instead of putting everything in the same file

This commit is contained in:
Tamo
2024-10-28 11:57:02 +01:00
parent 22229d3046
commit 362836efb7
4 changed files with 430 additions and 423 deletions

View File

@@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File};
use std::io::BufWriter;
use std::path::PathBuf;
use anyhow::{bail, Context};
use anyhow::Context;
use clap::{Parser, Subcommand};
use dump::{DumpWriter, IndexMetadata};
use file_store::FileStore;
@@ -10,15 +10,16 @@ use meilisearch_auth::AuthController;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::index::{db_name, main_key};
use meilisearch_types::milli::{obkv_to_json, BEU32};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::versioning::{create_version_file, get_version, parse_version};
use meilisearch_types::versioning::{get_version, parse_version};
use meilisearch_types::Index;
use time::macros::format_description;
use time::OffsetDateTime;
use upgrade::OfflineUpgrade;
use uuid_codec::UuidCodec;
mod upgrade;
mod uuid_codec;
#[derive(Parser)]
@@ -72,7 +73,7 @@ enum Command {
///
/// Supported upgrade paths:
///
/// - v1.9.0 -> v1.10.0
/// - v1.9.0 -> v1.10.0 -> v1.11.0
OfflineUpgrade {
#[arg(long)]
target_version: String,
@@ -96,425 +97,6 @@ fn main() -> anyhow::Result<()> {
}
}
struct OfflineUpgrade {
db_path: PathBuf,
current_version: (String, String, String),
target_version: (String, String, String),
}
impl OfflineUpgrade {
fn upgrade(self) -> anyhow::Result<()> {
// TODO: if we make this process support more versions, introduce a more flexible way of checking for the version
// currently only supports v1.9 to v1.10
let (current_major, current_minor, current_patch) = &self.current_version;
match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) {
("1", "9", _) => {}
_ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9")
}
}
let (target_major, target_minor, target_patch) = &self.target_version;
match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
("1", "10", _) => {}
_ => {
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10")
}
}
println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
self.v1_9_to_v1_10()?;
println!("Writing VERSION file");
create_version_file(&self.db_path, target_major, target_minor, target_patch)
.context("while writing VERSION file after the upgrade")?;
println!("Success");
Ok(())
}
fn v1_9_to_v1_10(&self) -> anyhow::Result<()> {
// 2 changes here
// 1. date format. needs to be done before opening the Index
// 2. REST embedders. We don't support this case right now, so bail
let index_scheduler_path = self.db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| {
format!("While trying to open {:?}", index_scheduler_path.display())
})?;
let mut sched_wtxn = env.write_txn()?;
let index_mapping: Database<Str, UuidCodec> =
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
let index_stats: Database<UuidCodec, Unspecified> =
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
format!("While trying to open {:?}", index_scheduler_path.display())
})?;
let index_count =
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
// 1. immutably for the iteration
// 2. mutably for updating index stats
let indexes: Vec<_> = index_mapping
.iter(&sched_wtxn)?
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
.collect();
let mut rest_embedders = Vec::new();
let mut unwrapped_indexes = Vec::new();
// check that update can take place
for (index_index, result) in indexes.into_iter().enumerate() {
let (uid, uuid) = result?;
let index_path = self.db_path.join("indexes").join(uuid.to_string());
println!(
"[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`",
index_index + 1,
index_path.display()
);
let index_env = unsafe {
// FIXME: fetch the 25 magic number from the index file
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
};
let index_txn = index_env.read_txn().with_context(|| {
format!(
"while obtaining a write transaction for index {uid} at {}",
index_path.display()
)
})?;
println!("\t- Checking for incompatible embedders (REST embedders)");
let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?;
if rest_embedders_for_index.is_empty() {
unwrapped_indexes.push((uid, uuid));
} else {
// no need to add to unwrapped indexes because we'll exit early
rest_embedders.push((uid, rest_embedders_for_index));
}
}
if !rest_embedders.is_empty() {
let rest_embedders = rest_embedders
.into_iter()
.flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders))
.map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`"))
.collect::<Vec<_>>()
.join("\n");
bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\
The database has not been modified and is still a valid v1.9 database.");
}
println!("Update can take place, updating");
for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() {
let index_path = self.db_path.join("indexes").join(uuid.to_string());
println!(
"[{}/{index_count}]Updating index `{uid}` at `{}`",
index_index + 1,
index_path.display()
);
let index_env = unsafe {
// FIXME: fetch the 25 magic number from the index file
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
};
let mut index_wtxn = index_env.write_txn().with_context(|| {
format!(
"while obtaining a write transaction for index `{uid}` at `{}`",
index_path.display()
)
})?;
println!("\t- Updating index stats");
update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?;
println!("\t- Updating date format");
update_date_format(&uid, &index_env, &mut index_wtxn)?;
index_wtxn.commit().with_context(|| {
format!(
"while committing the write txn for index `{uid}` at {}",
index_path.display()
)
})?;
}
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
println!("Upgrading database succeeded");
Ok(())
}
}
pub mod v1_9 {
pub type FieldDistribution = std::collections::BTreeMap<String, u64>;
/// The statistics that can be computed from an `Index` object.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64,
/// Size taken up by the index' DB, in bytes.
///
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
/// are not returned to the disk after a deletion, this number is typically larger than
/// `used_database_size` that only includes the size of the used pages.
pub database_size: u64,
/// Size taken by the used pages of the index' DB, in bytes.
///
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
/// this value is typically smaller than `database_size`.
pub used_database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution,
/// Creation date of the index.
pub created_at: time::OffsetDateTime,
/// Date of the last update of the index.
pub updated_at: time::OffsetDateTime,
}
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct IndexEmbeddingConfig {
pub name: String,
pub config: EmbeddingConfig,
}
#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
pub struct EmbeddingConfig {
/// Options of the embedder, specific to each kind of embedder
pub embedder_options: EmbedderOptions,
}
/// Options of an embedder, specific to each kind of embedder.
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub enum EmbedderOptions {
HuggingFace(hf::EmbedderOptions),
OpenAi(openai::EmbedderOptions),
Ollama(ollama::EmbedderOptions),
UserProvided(manual::EmbedderOptions),
Rest(rest::EmbedderOptions),
}
impl Default for EmbedderOptions {
fn default() -> Self {
Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None })
}
}
mod hf {
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct EmbedderOptions {
pub model: String,
pub revision: Option<String>,
}
}
mod openai {
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct EmbedderOptions {
pub api_key: Option<String>,
pub dimensions: Option<usize>,
}
}
mod ollama {
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct EmbedderOptions {
pub embedding_model: String,
pub url: Option<String>,
pub api_key: Option<String>,
}
}
mod manual {
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct EmbedderOptions {
pub dimensions: usize,
}
}
mod rest {
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)]
pub struct EmbedderOptions {
pub api_key: Option<String>,
pub dimensions: Option<usize>,
pub url: String,
pub input_field: Vec<String>,
// path to the array of embeddings
pub path_to_embeddings: Vec<String>,
// shape of a single embedding
pub embedding_object: Vec<String>,
}
}
pub type OffsetDateTime = time::OffsetDateTime;
}
pub mod v1_10 {
use crate::v1_9;
pub type FieldDistribution = std::collections::BTreeMap<String, u64>;
/// The statistics that can be computed from an `Index` object.
#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64,
/// Size taken up by the index' DB, in bytes.
///
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
/// are not returned to the disk after a deletion, this number is typically larger than
/// `used_database_size` that only includes the size of the used pages.
pub database_size: u64,
/// Size taken by the used pages of the index' DB, in bytes.
///
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
/// this value is typically smaller than `database_size`.
pub used_database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution,
/// Creation date of the index.
#[serde(with = "time::serde::rfc3339")]
pub created_at: time::OffsetDateTime,
/// Date of the last update of the index.
#[serde(with = "time::serde::rfc3339")]
pub updated_at: time::OffsetDateTime,
}
impl From<v1_9::IndexStats> for IndexStats {
fn from(
v1_9::IndexStats {
number_of_documents,
database_size,
used_database_size,
field_distribution,
created_at,
updated_at,
}: v1_9::IndexStats,
) -> Self {
IndexStats {
number_of_documents,
database_size,
used_database_size,
field_distribution,
created_at,
updated_at,
}
}
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(transparent)]
pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime);
}
fn update_index_stats(
index_stats: Database<UuidCodec, Unspecified>,
index_uid: &str,
index_uuid: uuid::Uuid,
sched_wtxn: &mut RwTxn,
) -> anyhow::Result<()> {
let ctx = || format!("while updating index stats for index `{index_uid}`");
let stats: Option<v1_9::IndexStats> = index_stats
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
.get(sched_wtxn, &index_uuid)
.with_context(ctx)?;
if let Some(stats) = stats {
let stats: v1_10::IndexStats = stats.into();
index_stats
.remap_data_type::<SerdeJson<v1_10::IndexStats>>()
.put(sched_wtxn, &index_uuid, &stats)
.with_context(ctx)?;
}
Ok(())
}
fn update_date_format(
index_uid: &str,
index_env: &Env,
index_wtxn: &mut RwTxn,
) -> anyhow::Result<()> {
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
.with_context(|| format!("while updating date format for index `{index_uid}`"))?;
date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?;
date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?;
Ok(())
}
fn find_rest_embedders(
index_uid: &str,
index_env: &Env,
index_txn: &RoTxn,
) -> anyhow::Result<Vec<String>> {
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
.with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?;
let mut rest_embedders = vec![];
for config in main
.remap_types::<Str, SerdeJson<Vec<v1_9::IndexEmbeddingConfig>>>()
.get(index_txn, main_key::EMBEDDING_CONFIGS)?
.unwrap_or_default()
{
if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options {
rest_embedders.push(config.name);
}
}
Ok(rest_embedders)
}
fn date_round_trip(
wtxn: &mut RwTxn,
index_uid: &str,
db: Database<Unspecified, Unspecified>,
key: &str,
) -> anyhow::Result<()> {
let datetime =
db.remap_types::<Str, SerdeJson<v1_9::OffsetDateTime>>().get(wtxn, key).with_context(
|| format!("could not read `{key}` while updating date format for index `{index_uid}`"),
)?;
if let Some(datetime) = datetime {
db.remap_types::<Str, SerdeJson<v1_10::OffsetDateTime>>()
.put(wtxn, key, &v1_10::OffsetDateTime(datetime))
.with_context(|| {
format!(
"could not write `{key}` while updating date format for index `{index_uid}`"
)
})?;
}
Ok(())
}
/// Clears the task queue located at `db_path`.
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
let path = db_path.join("tasks");