mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-22 04:36:32 +00:00
Compare commits
26 Commits
v1.22.1
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2fc7ae5e8 | ||
|
|
b817f58991 | ||
|
|
c885171029 | ||
|
|
3870a374af | ||
|
|
d41716d8f0 | ||
|
|
43a6505435 | ||
|
|
467e15d9c0 | ||
|
|
91275adb76 | ||
|
|
c29bdcae23 | ||
|
|
75219181a3 | ||
|
|
a5b5cf7cd1 | ||
|
|
142ba8ea00 | ||
|
|
4bc823e07c | ||
|
|
db06ca7138 | ||
|
|
95595a768e | ||
|
|
36f649768e | ||
|
|
0c6fc243f2 | ||
|
|
dfc46d5627 | ||
|
|
11d55f2121 | ||
|
|
014da57cf6 | ||
|
|
70a0ff4a8f | ||
|
|
dd0d5e4b90 | ||
|
|
15b3bb1700 | ||
|
|
f25db0795e | ||
|
|
6f0d26c22c | ||
|
|
d52c7dcc94 |
1
.github/dependabot.yml
vendored
1
.github/dependabot.yml
vendored
@@ -7,6 +7,5 @@ updates:
|
||||
schedule:
|
||||
interval: "monthly"
|
||||
labels:
|
||||
- 'skip changelog'
|
||||
- 'dependencies'
|
||||
rebase-strategy: disabled
|
||||
|
||||
6
.github/release-draft-template.yml
vendored
6
.github/release-draft-template.yml
vendored
@@ -18,6 +18,7 @@ categories:
|
||||
label: 'security'
|
||||
- title: '⚙️ Maintenance/misc'
|
||||
label:
|
||||
- 'dependencies'
|
||||
- 'maintenance'
|
||||
- 'documentation'
|
||||
template: |
|
||||
@@ -26,8 +27,3 @@ template: |
|
||||
❤️ Huge thanks to our contributors: $CONTRIBUTORS.
|
||||
no-changes-template: 'Changes are coming soon 😎'
|
||||
sort-direction: 'ascending'
|
||||
replacers:
|
||||
- search: '/(?:and )?@dependabot-preview(?:\[bot\])?,?/g'
|
||||
replace: ''
|
||||
- search: '/(?:and )?@dependabot(?:\[bot\])?,?/g'
|
||||
replace: ''
|
||||
|
||||
2
.github/workflows/publish-docker-images.yml
vendored
2
.github/workflows/publish-docker-images.yml
vendored
@@ -65,7 +65,7 @@ jobs:
|
||||
uses: docker/setup-buildx-action@v3
|
||||
|
||||
- name: Install cosign
|
||||
uses: sigstore/cosign-installer@d58896d6a1865668819e1d91763c7751a165e159 # tag=v3.9.2
|
||||
uses: sigstore/cosign-installer@d7543c93d881b35a8faa02e8e3605f69b7a1ce62 # tag=v3.10.0
|
||||
|
||||
- name: Login to Docker Hub
|
||||
uses: docker/login-action@v3
|
||||
|
||||
13
.github/workflows/publish-release-assets.yml
vendored
13
.github/workflows/publish-release-assets.yml
vendored
@@ -11,7 +11,7 @@ jobs:
|
||||
check-version:
|
||||
name: Check the version validity
|
||||
runs-on: ubuntu-latest
|
||||
# No need to check the version for dry run (cron)
|
||||
# No need to check the version for dry run (cron or workflow_dispatch)
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
# Check if the tag has the v<nmumber>.<number>.<number> format.
|
||||
@@ -48,7 +48,7 @@ jobs:
|
||||
- uses: dtolnay/rust-toolchain@1.89
|
||||
- name: Build
|
||||
run: cargo build --release --locked
|
||||
# No need to upload binaries for dry run (cron)
|
||||
# No need to upload binaries for dry run (cron or workflow_dispatch)
|
||||
- name: Upload binaries to release
|
||||
if: github.event_name == 'release'
|
||||
uses: svenstaro/upload-release-action@2.11.2
|
||||
@@ -78,7 +78,7 @@ jobs:
|
||||
- uses: dtolnay/rust-toolchain@1.89
|
||||
- name: Build
|
||||
run: cargo build --release --locked
|
||||
# No need to upload binaries for dry run (cron)
|
||||
# No need to upload binaries for dry run (cron or workflow_dispatch)
|
||||
- name: Upload binaries to release
|
||||
if: github.event_name == 'release'
|
||||
uses: svenstaro/upload-release-action@2.11.2
|
||||
@@ -111,7 +111,7 @@ jobs:
|
||||
command: build
|
||||
args: --release --target ${{ matrix.target }}
|
||||
- name: Upload the binary to release
|
||||
# No need to upload binaries for dry run (cron)
|
||||
# No need to upload binaries for dry run (cron or workflow_dispatch)
|
||||
if: github.event_name == 'release'
|
||||
uses: svenstaro/upload-release-action@2.11.2
|
||||
with:
|
||||
@@ -176,7 +176,7 @@ jobs:
|
||||
- name: List target output files
|
||||
run: ls -lR ./target
|
||||
- name: Upload the binary to release
|
||||
# No need to upload binaries for dry run (cron)
|
||||
# No need to upload binaries for dry run (cron or workflow_dispatch)
|
||||
if: github.event_name == 'release'
|
||||
uses: svenstaro/upload-release-action@2.11.2
|
||||
with:
|
||||
@@ -187,6 +187,7 @@ jobs:
|
||||
|
||||
publish-openapi-file:
|
||||
name: Publish OpenAPI file
|
||||
needs: check-version
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
@@ -201,7 +202,7 @@ jobs:
|
||||
cd crates/openapi-generator
|
||||
cargo run --release -- --pretty --output ../../meilisearch.json
|
||||
- name: Upload OpenAPI to Release
|
||||
# No need to upload for dry run (cron)
|
||||
# No need to upload for dry run (cron or workflow_dispatch)
|
||||
if: github.event_name == 'release'
|
||||
uses: svenstaro/upload-release-action@2.11.2
|
||||
with:
|
||||
|
||||
16
.github/workflows/sdks-tests.yml
vendored
16
.github/workflows/sdks-tests.yml
vendored
@@ -50,7 +50,7 @@ jobs:
|
||||
with:
|
||||
repository: meilisearch/meilisearch-dotnet
|
||||
- name: Setup .NET Core
|
||||
uses: actions/setup-dotnet@v4
|
||||
uses: actions/setup-dotnet@v5
|
||||
with:
|
||||
dotnet-version: "8.0.x"
|
||||
- name: Install dependencies
|
||||
@@ -100,7 +100,7 @@ jobs:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v5
|
||||
uses: actions/setup-go@v6
|
||||
with:
|
||||
go-version: stable
|
||||
- uses: actions/checkout@v5
|
||||
@@ -135,13 +135,13 @@ jobs:
|
||||
- name: Set up Java
|
||||
uses: actions/setup-java@v5
|
||||
with:
|
||||
java-version: 8
|
||||
distribution: 'zulu'
|
||||
java-version: 17
|
||||
distribution: 'temurin'
|
||||
cache: gradle
|
||||
- name: Grant execute permission for gradlew
|
||||
run: chmod +x gradlew
|
||||
- name: Build and run unit and integration tests
|
||||
run: ./gradlew build integrationTest
|
||||
run: ./gradlew build integrationTest --info
|
||||
|
||||
meilisearch-js-tests:
|
||||
needs: define-docker-image
|
||||
@@ -160,7 +160,7 @@ jobs:
|
||||
with:
|
||||
repository: meilisearch/meilisearch-js
|
||||
- name: Setup node
|
||||
uses: actions/setup-node@v4
|
||||
uses: actions/setup-node@v5
|
||||
with:
|
||||
cache: 'yarn'
|
||||
- name: Install dependencies
|
||||
@@ -224,7 +224,7 @@ jobs:
|
||||
with:
|
||||
repository: meilisearch/meilisearch-python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
uses: actions/setup-python@v6
|
||||
- name: Install pipenv
|
||||
uses: dschep/install-pipenv-action@v1
|
||||
- name: Install dependencies
|
||||
@@ -318,7 +318,7 @@ jobs:
|
||||
with:
|
||||
repository: meilisearch/meilisearch-js-plugins
|
||||
- name: Setup node
|
||||
uses: actions/setup-node@v4
|
||||
uses: actions/setup-node@v5
|
||||
with:
|
||||
cache: yarn
|
||||
- name: Install dependencies
|
||||
|
||||
@@ -121,7 +121,7 @@ If you want to know more about the kind of data we collect and what we use it fo
|
||||
|
||||
Meilisearch is a search engine created by [Meili](https://www.meilisearch.com/careers), a software development company headquartered in France and with team members all over the world. Want to know more about us? [Check out our blog!](https://blog.meilisearch.com/?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=contact)
|
||||
|
||||
🗞 [Subscribe to our newsletter](https://meilisearch.us2.list-manage.com/subscribe?u=27870f7b71c908a8b359599fb&id=79582d828e) if you don't want to miss any updates! We promise we won't clutter your mailbox: we only send one edition every two months.
|
||||
🗞 [Subscribe to our newsletter](https://share-eu1.hsforms.com/1LN5N0x_GQgq7ss7tXmSykwfg3aq) if you don't want to miss any updates! We promise we won't clutter your mailbox: we only send one edition every two months.
|
||||
|
||||
💌 Want to make a suggestion or give feedback? Here are some of the channels where you can reach us:
|
||||
|
||||
|
||||
@@ -33,6 +33,10 @@ impl FileStore {
|
||||
std::fs::create_dir_all(&path)?;
|
||||
Ok(FileStore { path })
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
}
|
||||
|
||||
impl FileStore {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
@@ -591,4 +591,8 @@ impl IndexMapper {
|
||||
pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) {
|
||||
*self.currently_updating_index.write().unwrap() = index;
|
||||
}
|
||||
|
||||
pub fn base_path(&self) -> &Path {
|
||||
&self.base_path
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,11 @@ impl MustStopProcessing {
|
||||
pub fn reset(&self) {
|
||||
self.0.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn as_lambda(&self) -> impl Fn() -> bool + Send + Sync + 'static {
|
||||
let clone = self.clone();
|
||||
move || clone.get()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Scheduler {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::InternalError;
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
|
||||
@@ -76,6 +77,22 @@ unsafe fn remove_tasks(
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(super) fn process_snapshot(
|
||||
&self,
|
||||
progress: Progress,
|
||||
tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
|
||||
CompactionOption::Disabled
|
||||
} else {
|
||||
CompactionOption::Enabled
|
||||
};
|
||||
match compaction_option {
|
||||
CompactionOption::Enabled => self.process_snapshot_with_temp(progress, tasks),
|
||||
CompactionOption::Disabled => self.process_snapshot_with_pipe(progress, tasks),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshot_with_temp(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
@@ -105,12 +122,8 @@ impl IndexScheduler {
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let dst = temp_snapshot_dir.path().join("tasks");
|
||||
fs::create_dir_all(&dst)?;
|
||||
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
|
||||
CompactionOption::Disabled
|
||||
} else {
|
||||
CompactionOption::Enabled
|
||||
};
|
||||
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
||||
|
||||
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 2.2 Remove the current snapshot tasks
|
||||
//
|
||||
@@ -161,7 +174,7 @@ impl IndexScheduler {
|
||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
fs::create_dir_all(&dst)?;
|
||||
index
|
||||
.copy_to_path(dst.join("data.mdb"), compaction_option)
|
||||
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
|
||||
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||
}
|
||||
|
||||
@@ -171,7 +184,7 @@ impl IndexScheduler {
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let dst = temp_snapshot_dir.path().join("auth");
|
||||
fs::create_dir_all(&dst)?;
|
||||
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
||||
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 5. Copy and tarball the flat snapshot
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
@@ -206,4 +219,139 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
fn process_snapshot_with_pipe(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
||||
let must_stop_processing = &self.scheduler.must_stop_processing;
|
||||
let abort_no_index = Err(Error::from_milli(InternalError::AbortedIndexation.into(), None));
|
||||
|
||||
fs::create_dir_all(&self.scheduler.snapshots_path)?;
|
||||
|
||||
// 1. Find the base path and original name of the database
|
||||
|
||||
// TODO find a better way to get this path
|
||||
let mut base_path = self.env.path().to_owned();
|
||||
base_path.pop();
|
||||
let base_path = base_path;
|
||||
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
|
||||
|
||||
// 2. Start the tarball builder. The tarball will be created on another thread from piped data.
|
||||
|
||||
let mut builder = compression::PipedArchiveBuilder::new(
|
||||
self.scheduler.snapshots_path.clone(),
|
||||
base_path.clone(),
|
||||
must_stop_processing.as_lambda(),
|
||||
);
|
||||
|
||||
// 3. Snapshot the VERSION file
|
||||
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 4. Snapshot the index-scheduler LMDB env
|
||||
//
|
||||
// When we call copy_to_path, LMDB opens a read transaction by itself,
|
||||
// we can't provide our own. It is an issue as we would like to know
|
||||
// the update files to copy but new ones can be enqueued between the copy
|
||||
// of the env and the new transaction we open to retrieve the enqueued tasks.
|
||||
// So we prefer opening a new transaction after copying the env and copy more
|
||||
// update files than not enough.
|
||||
//
|
||||
// Note that there cannot be any update files deleted between those
|
||||
// two read operations as the task processing is synchronous.
|
||||
|
||||
// 4.1 First copy the LMDB env of the index-scheduler
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
builder.add_env_to_archive(&self.env)?;
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 4.2 Create a read transaction on the index-scheduler
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// 4.3 Only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
builder.add_dir_to_archive(self.queue.file_store.path().to_path_buf())?;
|
||||
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
progress.update_progress(update_file_progress);
|
||||
for task_id in enqueued {
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.get_update_path(content_uuid);
|
||||
builder.add_file_to_archive(src)?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 5. Snapshot every index
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
builder.add_dir_to_archive(self.index_mapper.base_path().to_path_buf())?;
|
||||
let index_mapping = self.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, _) = result?;
|
||||
let abort_index = || {
|
||||
Err(Error::from_milli(
|
||||
InternalError::AbortedIndexation.into(),
|
||||
Some(name.to_string()), // defer the `to_string`
|
||||
))
|
||||
};
|
||||
|
||||
if must_stop_processing.get() {
|
||||
return abort_index();
|
||||
}
|
||||
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
));
|
||||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
builder.add_env_to_archive(index.raw_env())?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 6. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
builder.add_env_to_archive(&self.scheduler.auth_env)?;
|
||||
|
||||
// 7. Finalize the tarball
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
let file =
|
||||
builder.finish(&self.scheduler.snapshots_path.join(format!("{db_name}.snapshot")))?;
|
||||
|
||||
// 8. Change the permission to make the snapshot readonly
|
||||
let mut permissions = file.metadata()?.permissions();
|
||||
permissions.set_readonly(true);
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
#[allow(clippy::non_octal_unix_permissions)]
|
||||
// rwxrwxrwx
|
||||
permissions.set_mode(0b100100100);
|
||||
}
|
||||
|
||||
file.set_permissions(permissions)?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::io::{PipeWriter, Read, Write};
|
||||
use std::mem::ManuallyDrop;
|
||||
use std::ops::DerefMut;
|
||||
use std::os::fd::{AsRawFd, FromRawFd};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use tar::{Archive, Builder};
|
||||
use milli::heed::Env;
|
||||
use tar::{Archive, Builder, Header};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let mut f = File::create(dest)?;
|
||||
@@ -26,3 +33,222 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
|
||||
ar.unpack(&dest)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct PipedArchiveBuilder {
|
||||
send_compression: Sender<CompressionMessage>,
|
||||
send_cancellation: Sender<CancellationMessage>,
|
||||
processing_thread: JoinHandle<anyhow::Result<NamedTempFile>>,
|
||||
cancellation_thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
enum CompressionMessage {
|
||||
Env { path: PathBuf, reader: std::io::PipeReader },
|
||||
File { path: PathBuf },
|
||||
Dir { path: PathBuf },
|
||||
}
|
||||
|
||||
impl PipedArchiveBuilder {
|
||||
pub fn new<F>(dest_dir: PathBuf, base_path: PathBuf, must_stop_processing: F) -> Self
|
||||
where
|
||||
F: Fn() -> bool + Send + 'static,
|
||||
{
|
||||
let (send_compression, recv) = std::sync::mpsc::channel();
|
||||
let processing_thread = std::thread::Builder::new()
|
||||
.name("piped-archive-builder".into())
|
||||
.spawn(|| Self::run_processing(dest_dir, recv, base_path))
|
||||
.unwrap();
|
||||
|
||||
let (send_cancellation, recv) = std::sync::mpsc::channel();
|
||||
|
||||
let cancellation_thread = std::thread::Builder::new()
|
||||
.name("piped-archive-builder-cancellation".into())
|
||||
.spawn(|| Self::run_cancellation(must_stop_processing, recv))
|
||||
.unwrap();
|
||||
|
||||
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
|
||||
}
|
||||
|
||||
/// Add a heed environment to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Errors originating with that thread:
|
||||
/// - Heed errors, if taking a write transaction fails
|
||||
/// - If the copy of the environment fails.
|
||||
/// - If there is an I/O error opening the database at the environment's path.
|
||||
/// - Errors originating with another thread:
|
||||
/// - If the cancellation thread panicked or otherwise dropped its receiver.
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let path = env.path().to_path_buf();
|
||||
// make sure that the environment cannot change while it is being added to the archive,
|
||||
// as any concurrent change would corrupt the copy.
|
||||
let env_wtxn = env.write_txn()?;
|
||||
|
||||
// SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since
|
||||
// the clone is `ManuallyDrop`.
|
||||
let mut cloned_writer = unsafe {
|
||||
let writer_raw_fd = writer.as_raw_fd();
|
||||
ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd))
|
||||
};
|
||||
|
||||
self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer });
|
||||
|
||||
self.send_compression.send(CompressionMessage::Env { path, reader });
|
||||
|
||||
let mdb_path = env.path().join("data.mdb");
|
||||
let mut file = std::fs::File::open(&mdb_path)?;
|
||||
let mut file = std::io::BufReader::with_capacity(16 * 4096, &mut file);
|
||||
std::io::copy(&mut file, cloned_writer.deref_mut())?;
|
||||
|
||||
self.send_cancellation.send(CancellationMessage::ClosingPipe);
|
||||
|
||||
// no change we might want to commit
|
||||
env_wtxn.abort();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a file to the archive
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::File { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a directory name (**without its contents**) to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::Dir { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalize the archive and persists it to disk.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Originating with the current thread:
|
||||
/// - If persisting the archive fails
|
||||
/// - Originating with another thread:
|
||||
/// - If the cancellation thread panicked.
|
||||
/// - If the processing thread panicked or otherwise terminated in error.
|
||||
pub fn finish(self, dest_path: &Path) -> anyhow::Result<File> {
|
||||
drop(self.send_cancellation);
|
||||
drop(self.send_compression);
|
||||
/// FIXME catch panics
|
||||
let temp_archive = self.processing_thread.join().unwrap()?;
|
||||
self.cancellation_thread.join().unwrap();
|
||||
let archive = temp_archive.persist(dest_path)?;
|
||||
Ok(archive)
|
||||
}
|
||||
|
||||
fn run_processing(
|
||||
dest_dir: PathBuf,
|
||||
recv: Receiver<CompressionMessage>,
|
||||
base_path: PathBuf,
|
||||
) -> anyhow::Result<NamedTempFile> {
|
||||
let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
|
||||
|
||||
let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default());
|
||||
let mut tar_encoder = Builder::new(gz_encoder);
|
||||
let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path);
|
||||
// add the root
|
||||
tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?;
|
||||
while let Ok(message) = recv.recv() {
|
||||
match message {
|
||||
CompressionMessage::Env { path, reader } => {
|
||||
let dir_path_in_archive =
|
||||
PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
|
||||
tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?;
|
||||
|
||||
let path = path.join("data.mdb");
|
||||
Self::add_to_archive(&mut tar_encoder, &path, &base_path, reader)?;
|
||||
}
|
||||
CompressionMessage::File { path } => {
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
|
||||
}
|
||||
CompressionMessage::Dir { path } => {
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
|
||||
tar_encoder.append_dir(path_in_archive.as_path(), &path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
gz_encoder.finish()?;
|
||||
temp_archive.flush()?;
|
||||
Ok(temp_archive)
|
||||
}
|
||||
|
||||
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
|
||||
where
|
||||
F: Fn() -> bool + Send + 'static,
|
||||
{
|
||||
let mut current_pipe = None;
|
||||
loop {
|
||||
let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) {
|
||||
Ok(message) => message,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
if must_stop_processing() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match next_message {
|
||||
CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe),
|
||||
CancellationMessage::ClosingPipe => current_pipe = None,
|
||||
}
|
||||
}
|
||||
drop(current_pipe);
|
||||
}
|
||||
|
||||
fn add_to_archive(
|
||||
tar_encoder: &mut Builder<impl Write>,
|
||||
path: &Path,
|
||||
base: &Path,
|
||||
reader: impl Read,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = path.metadata()?;
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_metadata_in_mode(&stats, tar::HeaderMode::Complete);
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(path, base);
|
||||
|
||||
tar_encoder.append_data(&mut header, path_in_archive.as_path(), reader)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
enum CancellationMessage {
|
||||
OpenedPipe { pipe: PipeWriter },
|
||||
ClosingPipe,
|
||||
}
|
||||
|
||||
struct PathInArchive(PathBuf);
|
||||
|
||||
impl PathInArchive {
|
||||
pub fn from_absolute_and_base(absolute: &Path, base: &Path) -> Self {
|
||||
/// FIXME
|
||||
let canonical = absolute.canonicalize().unwrap();
|
||||
let relative = match canonical.strip_prefix(base) {
|
||||
Ok(stripped) => Path::new(&".").join(stripped),
|
||||
Err(_) => absolute.to_path_buf(),
|
||||
};
|
||||
|
||||
Self(relative)
|
||||
}
|
||||
|
||||
pub fn as_path(&self) -> &Path {
|
||||
self.0.as_path()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1983,6 +1983,11 @@ impl Index {
|
||||
|
||||
Ok(sizes)
|
||||
}
|
||||
|
||||
/// The underlying env for raw access
|
||||
pub fn raw_env(&self) -> &heed::Env<WithoutTls> {
|
||||
&self.env
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EmbeddingsWithMetadata {
|
||||
|
||||
Reference in New Issue
Block a user