Compare commits

...

6 Commits

Author SHA1 Message Date
Louis Dureuil
c885171029 process: add cancelling points in process snapshot 2025-10-02 17:16:05 +02:00
Louis Dureuil
3870a374af Compression: implement cancellation and change env copy method 2025-10-02 16:56:25 +02:00
Louis Dureuil
d41716d8f0 Add MustStopProcessing::as_lambda 2025-10-02 16:50:44 +02:00
Louis Dureuil
43a6505435 Use PipedArchiveBuilder to process snapshots without compaction 2025-10-02 11:18:54 +02:00
Louis Dureuil
467e15d9c0 WIP: Add PipedArchiveBuilder 2025-10-02 11:18:13 +02:00
Louis Dureuil
91275adb76 Add necessary accessors 2025-10-02 11:12:51 +02:00
6 changed files with 378 additions and 12 deletions

View File

@@ -33,6 +33,10 @@ impl FileStore {
std::fs::create_dir_all(&path)?;
Ok(FileStore { path })
}
pub fn path(&self) -> &Path {
&self.path
}
}
impl FileStore {

View File

@@ -1,4 +1,4 @@
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::{fs, thread};
@@ -591,4 +591,8 @@ impl IndexMapper {
pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) {
*self.currently_updating_index.write().unwrap() = index;
}
pub fn base_path(&self) -> &Path {
&self.base_path
}
}

View File

@@ -50,6 +50,11 @@ impl MustStopProcessing {
pub fn reset(&self) {
self.0.store(false, Ordering::Relaxed);
}
pub fn as_lambda(&self) -> impl Fn() -> bool + Send + Sync + 'static {
let clone = self.clone();
move || clone.get()
}
}
pub struct Scheduler {

View File

@@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
use meilisearch_types::heed::CompactionOption;
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::InternalError;
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
@@ -76,6 +77,22 @@ unsafe fn remove_tasks(
impl IndexScheduler {
pub(super) fn process_snapshot(
&self,
progress: Progress,
tasks: Vec<Task>,
) -> Result<Vec<Task>> {
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
CompactionOption::Disabled
} else {
CompactionOption::Enabled
};
match compaction_option {
CompactionOption::Enabled => self.process_snapshot_with_temp(progress, tasks),
CompactionOption::Disabled => self.process_snapshot_with_pipe(progress, tasks),
}
}
fn process_snapshot_with_temp(
&self,
progress: Progress,
mut tasks: Vec<Task>,
@@ -105,12 +122,8 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
CompactionOption::Disabled
} else {
CompactionOption::Enabled
};
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 2.2 Remove the current snapshot tasks
//
@@ -161,7 +174,7 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;
index
.copy_to_path(dst.join("data.mdb"), compaction_option)
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
}
@@ -171,7 +184,7 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let dst = temp_snapshot_dir.path().join("auth");
fs::create_dir_all(&dst)?;
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
@@ -206,4 +219,139 @@ impl IndexScheduler {
Ok(tasks)
}
fn process_snapshot_with_pipe(
&self,
progress: Progress,
mut tasks: Vec<Task>,
) -> Result<Vec<Task>> {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
let must_stop_processing = &self.scheduler.must_stop_processing;
let abort_no_index = Err(Error::from_milli(InternalError::AbortedIndexation.into(), None));
fs::create_dir_all(&self.scheduler.snapshots_path)?;
// 1. Find the base path and original name of the database
// TODO find a better way to get this path
let mut base_path = self.env.path().to_owned();
base_path.pop();
let base_path = base_path;
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
// 2. Start the tarball builder. The tarball will be created on another thread from piped data.
let mut builder = compression::PipedArchiveBuilder::new(
self.scheduler.snapshots_path.clone(),
format!("{db_name}.snapshot"),
base_path,
must_stop_processing.as_lambda(),
);
// 3. Snapshot the VERSION file
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
if must_stop_processing.get() {
return abort_no_index;
}
// 4. Snapshot the index-scheduler LMDB env
//
// When we call copy_to_path, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
// So we prefer opening a new transaction after copying the env and copy more
// update files than not enough.
//
// Note that there cannot be any update files deleted between those
// two read operations as the task processing is synchronous.
// 4.1 First copy the LMDB env of the index-scheduler
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
builder.add_env_to_archive(&self.env)?;
if must_stop_processing.get() {
return abort_no_index;
}
// 4.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 4.3 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
builder.add_dir_to_archive(self.queue.file_store.path().to_path_buf())?;
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
for task_id in enqueued {
if must_stop_processing.get() {
return abort_no_index;
}
let task =
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.queue.file_store.get_update_path(content_uuid);
builder.add_file_to_archive(src)?;
}
atomic.fetch_add(1, Ordering::Relaxed);
}
// 5. Snapshot every index
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
builder.add_dir_to_archive(self.index_mapper.base_path().to_path_buf())?;
let index_mapping = self.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, _) = result?;
let abort_index = || {
Err(Error::from_milli(
InternalError::AbortedIndexation.into(),
Some(name.to_string()), // defer the `to_string`
))
};
if must_stop_processing.get() {
return abort_index();
}
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
name, i as u32, nb_indexes,
));
let index = self.index_mapper.index(&rtxn, name)?;
builder.add_env_to_archive(index.raw_env())?;
}
drop(rtxn);
if must_stop_processing.get() {
return abort_no_index;
}
// 6. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
builder.add_env_to_archive(&self.scheduler.auth_env)?;
// 7. Finalize the tarball
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
let file = builder.finish()?;
// 8. Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
#[allow(clippy::non_octal_unix_permissions)]
// rwxrwxrwx
permissions.set_mode(0b100100100);
}
file.set_permissions(permissions)?;
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
}

View File

@@ -1,11 +1,17 @@
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path;
use std::io::{PipeWriter, Read, Write};
use std::mem::ManuallyDrop;
use std::ops::DerefMut;
use std::os::fd::{AsRawFd, FromRawFd};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::thread::JoinHandle;
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use tar::{Archive, Builder};
use milli::heed::Env;
use tar::{Archive, Builder, Header};
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;
@@ -26,3 +32,197 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
ar.unpack(&dest)?;
Ok(())
}
pub struct PipedArchiveBuilder {
send_compression: Sender<CompressionMessage>,
send_cancellation: Sender<CancellationMessage>,
processing_thread: JoinHandle<anyhow::Result<File>>,
cancellation_thread: JoinHandle<()>,
}
enum CompressionMessage {
Env { path: PathBuf, reader: std::io::PipeReader },
File { path: PathBuf },
Dir { path: PathBuf },
}
impl PipedArchiveBuilder {
pub fn new<F>(
dest_dir: PathBuf,
dest_filename: String,
base_path: PathBuf,
must_stop_processing: F,
) -> Self
where
F: Fn() -> bool + Send + 'static,
{
let (send_compression, recv) = std::sync::mpsc::channel();
let processing_thread = std::thread::Builder::new()
.name("piped-archive-builder".into())
.spawn(|| Self::run_processing(dest_dir, dest_filename, recv, base_path))
.unwrap();
let (send_cancellation, recv) = std::sync::mpsc::channel();
let cancellation_thread = std::thread::Builder::new()
.name("piped-archive-builder-cancellation".into())
.spawn(|| Self::run_cancellation(must_stop_processing, recv))
.unwrap();
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
}
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
let (reader, writer) = std::io::pipe()?;
let path = env.path().to_path_buf();
// make sure that the environment cannot change while it is being added to the archive,
// as any concurrent change would corrupt the copy.
let env_wtxn = env.write_txn()?;
// SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since
// the clone is `ManuallyDrop`.
let mut cloned_writer = unsafe {
let writer_raw_fd = writer.as_raw_fd();
ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd))
};
self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer });
self.send_compression.send(CompressionMessage::Env { path, reader });
let mdb_path = env.path().join("data.mdb");
let mut file = std::fs::File::open(&mdb_path)?;
std::io::copy(&mut file, cloned_writer.deref_mut())?;
self.send_cancellation.send(CancellationMessage::ClosingPipe);
// no change we might want to commit
env_wtxn.abort();
Ok(())
}
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
self.send_compression.send(CompressionMessage::File { path });
Ok(())
}
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
self.send_compression.send(CompressionMessage::Dir { path });
Ok(())
}
pub fn finish(self) -> anyhow::Result<File> {
drop(self.send_cancellation);
drop(self.send_compression);
/// FIXME catch panics
let file = self.processing_thread.join().unwrap()?;
self.cancellation_thread.join().unwrap();
Ok(file)
}
fn run_processing(
dest_dir: PathBuf,
dest_filename: String,
recv: Receiver<CompressionMessage>,
base_path: PathBuf,
) -> anyhow::Result<File> {
let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder);
let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path);
// add the root
tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?;
while let Ok(message) = recv.recv() {
match message {
CompressionMessage::Env { path, reader } => {
let dir_path_in_archive =
PathInArchive::from_absolute_and_base(&path, &base_path);
tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?;
let path = path.join("data.mdb");
Self::add_to_archive(&mut tar_encoder, &path, &base_path, reader)?;
}
CompressionMessage::File { path } => {
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
}
CompressionMessage::Dir { path } => {
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
tar_encoder.append_dir(path_in_archive.as_path(), &path)?;
}
}
}
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
temp_archive.flush()?;
let archive = temp_archive.persist(dest_dir.join(dest_filename))?;
Ok(archive)
}
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
where
F: Fn() -> bool + Send + 'static,
{
let mut current_pipe = None;
loop {
let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) {
Ok(message) => message,
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout) => {
if must_stop_processing() {
break;
}
continue;
}
};
match next_message {
CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe),
CancellationMessage::ClosingPipe => current_pipe = None,
}
}
drop(current_pipe);
}
fn add_to_archive(
tar_encoder: &mut Builder<impl Write>,
path: &Path,
base: &Path,
reader: impl Read,
) -> anyhow::Result<()> {
let stats = path.metadata()?;
let mut header = Header::new_gnu();
header.set_metadata_in_mode(&stats, tar::HeaderMode::Complete);
let path_in_archive = PathInArchive::from_absolute_and_base(path, base);
tar_encoder.append_data(&mut header, path_in_archive.as_path(), reader)?;
Ok(())
}
}
enum CancellationMessage {
OpenedPipe { pipe: PipeWriter },
ClosingPipe,
}
struct PathInArchive(PathBuf);
impl PathInArchive {
pub fn from_absolute_and_base(absolute: &Path, base: &Path) -> Self {
/// FIXME
let canonical = absolute.canonicalize().unwrap();
let relative = match canonical.strip_prefix(base) {
Ok(stripped) => Path::new(&".").join(stripped),
Err(_) => absolute.to_path_buf(),
};
Self(relative)
}
pub fn as_path(&self) -> &Path {
self.0.as_path()
}
}

View File

@@ -1983,6 +1983,11 @@ impl Index {
Ok(sizes)
}
/// The underlying env for raw access
pub fn raw_env(&self) -> &heed::Env<WithoutTls> {
&self.env
}
}
pub struct EmbeddingsWithMetadata {