Compare commits

...

2 Commits

Author SHA1 Message Date
Tamo
e017986dde wip 2025-01-27 18:22:07 +01:00
Tamo
ad0765ffa4 Start working on the batch import in the dump 2025-01-20 12:44:21 +01:00
6 changed files with 155 additions and 8 deletions

View File

@@ -1,7 +1,8 @@
#![allow(clippy::type_complexity)]
#![allow(clippy::wrong_self_convention)]
use meilisearch_types::batches::BatchId;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::Key;
use meilisearch_types::milli::update::IndexDocumentsMethod;
@@ -91,6 +92,13 @@ pub struct TaskDump {
pub finished_at: Option<OffsetDateTime>,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BatchDump {
pub original: Batch,
pub tasks: RoaringBitmap,
}
// A `Kind` specific version made for the dump. If modified you may break the dump.
#[derive(Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]

View File

@@ -101,6 +101,16 @@ impl DumpReader {
}
}
pub fn batches(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Batch>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.dumps()),
// There was no batches in the previous version
DumpReader::Compat(_compat) => {
Ok(Box::new(std::iter::empty()) as Box<dyn Iterator<Item = Result<v6::Batch>> + '_>)
}
}
}
pub fn keys(&mut self) -> Result<Box<dyn Iterator<Item = Result<v6::Key>> + '_>> {
match self {
DumpReader::Current(current) => Ok(current.keys()),

View File

@@ -18,6 +18,7 @@ pub type Checked = meilisearch_types::settings::Checked;
pub type Unchecked = meilisearch_types::settings::Unchecked;
pub type Task = crate::TaskDump;
pub type Batch = crate::BatchDump;
pub type Key = meilisearch_types::keys::Key;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
@@ -48,6 +49,7 @@ pub struct V6Reader {
instance_uid: Option<Uuid>,
metadata: Metadata,
tasks: BufReader<File>,
batches: BufReader<File>,
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
}
@@ -82,6 +84,7 @@ impl V6Reader {
metadata: serde_json::from_reader(&*meta_file)?,
instance_uid,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
batches: BufReader::new(File::open(dump.path().join("batches").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
features,
dump,
@@ -124,7 +127,7 @@ impl V6Reader {
&mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?).unwrap();
let task: Task = serde_json::from_str(&line?)?;
let update_file_path = self
.dump
@@ -145,6 +148,14 @@ impl V6Reader {
}))
}
pub fn dumps(&mut self) -> Box<dyn Iterator<Item = Result<Batch>> + '_> {
Box::new(
(&mut self.batches)
.lines()
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
)
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Key>> + '_> {
Box::new(
(&mut self.keys).lines().map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),

View File

@@ -4,6 +4,7 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
@@ -13,7 +14,7 @@ use time::OffsetDateTime;
use uuid::Uuid;
use crate::reader::Document;
use crate::{IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION};
use crate::{BatchDump, IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION};
pub struct DumpWriter {
dir: TempDir,
@@ -54,6 +55,10 @@ impl DumpWriter {
TaskWriter::new(self.dir.path().join("tasks"))
}
pub fn create_batches_queue(&self) -> Result<BatchWriter> {
BatchWriter::new(self.dir.path().join("batches"))
}
pub fn create_experimental_features(&self, features: RuntimeTogglableFeatures) -> Result<()> {
Ok(std::fs::write(
self.dir.path().join("experimental-features.json"),
@@ -156,6 +161,31 @@ impl UpdateFile {
}
}
pub struct BatchWriter {
queue: BufWriter<File>,
}
impl BatchWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir(&path)?;
let queue = File::create(path.join("queue.jsonl"))?;
Ok(BatchWriter { queue: BufWriter::new(queue) })
}
/// Pushes batches in the dump.
/// The batches doesn't contains any private information thus we don't need a special type like with the tasks.
pub fn push_batch(&mut self, batch: &BatchDump) -> Result<()> {
self.queue.write_all(&serde_json::to_vec(batch)?)?;
self.queue.write_all(b"\n")?;
Ok(())
}
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
}
pub struct IndexWriter {
documents: BufWriter<File>,
settings: File,

View File

@@ -17,7 +17,7 @@ tasks individually, but should be much faster since we are only performing
one indexing operation.
*/
use std::collections::{BTreeSet, HashMap, HashSet};
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::ffi::OsStr;
use std::fmt;
use std::fs::{self, File};
@@ -26,7 +26,8 @@ use std::sync::atomic::Ordering;
use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use dump::IndexMetadata;
use dump::{BatchDump, IndexMetadata};
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
@@ -790,7 +791,8 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
// 2. dump the queue
// 2.1. dump the tasks
progress.update_progress(DumpCreationProgress::DumpTheTasks);
let mut dump_tasks = dump.create_tasks_queue()?;
@@ -821,7 +823,7 @@ impl IndexScheduler {
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
// 2.1.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if self.must_stop_processing.get() {
return Err(Error::AbortedTask);
@@ -851,6 +853,44 @@ impl IndexScheduler {
}
dump_tasks.flush()?;
// 2.2. dump the batches
let mut dump_batches = dump.create_batches_queue()?;
let (atomic, update_batch_progress) =
AtomicBatchStep::new(self.all_batches.len(&rtxn)? as u32);
progress.update_progress(update_batch_progress);
for ret in self.all_batches.iter(&rtxn)? {
if self.must_stop_processing.get() {
return Err(Error::AbortedTask);
}
let (_, mut batch) = ret?;
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if task.batch_uid == Some(batch.uid) {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
batch.progress = None;
let mut statuses = BTreeMap::new();
statuses.insert(Status::Succeeded, 1);
batch.stats.status = statuses;
batch.started_at = started_at;
batch.finished_at = Some(finished_at);
}
// a missing or empty batch shouldn't exists but if it happens we should just skip it
if let Some(tasks) = self.batch_to_tasks_mapping.get(&rtxn, &batch.uid)? {
if !tasks.is_empty() {
dump_batches.push_batch(&BatchDump { original: batch, tasks })?;
}
}
atomic.fetch_add(1, Ordering::Relaxed);
}
dump_batches.flush()?;
// 3. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;

View File

@@ -43,12 +43,13 @@ use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use dump::{KindDump, TaskDump, UpdateFile};
use dump::{BatchDump, KindDump, TaskDump, UpdateFile};
pub use error::Error;
pub use features::RoFeatures;
use file_store::FileStore;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use meilisearch_types::batch_view::BatchView;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
@@ -1994,6 +1995,12 @@ pub struct Dump<'a> {
indexes: HashMap<String, RoaringBitmap>,
statuses: HashMap<Status, RoaringBitmap>,
kinds: HashMap<Kind, RoaringBitmap>,
batch_indexes: HashMap<String, RoaringBitmap>,
batch_statuses: HashMap<Status, RoaringBitmap>,
batch_kinds: HashMap<Kind, RoaringBitmap>,
batch_to_tasks_mapping: HashMap<TaskId, RoaringBitmap>,
}
impl<'a> Dump<'a> {
@@ -2007,6 +2014,10 @@ impl<'a> Dump<'a> {
indexes: HashMap::new(),
statuses: HashMap::new(),
kinds: HashMap::new(),
batch_indexes: HashMap::new(),
batch_statuses: HashMap::new(),
batch_kinds: HashMap::new(),
batch_to_tasks_mapping: HashMap::new(),
})
}
@@ -2158,9 +2169,46 @@ impl<'a> Dump<'a> {
self.statuses.entry(task.status).or_default().insert(task.uid);
self.kinds.entry(task.kind.as_kind()).or_default().insert(task.uid);
if let Some(batch_uid) = task.batch_uid {
self.batch_to_tasks_mapping.entry(batch_uid).or_default().insert(task.uid);
}
Ok(task)
}
/// Register a new task coming from a dump in the scheduler.
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_batch(&mut self, dump: BatchDump) -> Result<()> {
let BatchDump { original, tasks } = dump;
self.index_scheduler.all_batches.put(&mut self.wtxn, &original.uid, &original)?;
self.index_scheduler..put(&mut self.wtxn, &original.uid, &original)?;
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.batch_enqueued_at,
original.started_at, // TODO: retrieve the enqueued_at from the dump
original.uid,
)?;
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.started_at,
original.started_at,
original.uid,
)?;
if let Some(finished_at) = original.finished_at {
utils::insert_task_datetime(
&mut self.wtxn,
self.index_scheduler.finished_at,
finished_at,
original.uid,
)?;
}
Ok(())
}
/// Commit all the changes and exit the importing dump state
pub fn finish(mut self) -> Result<()> {
for (index, bitmap) in self.indexes {