mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-01 02:05:36 +00:00
Compare commits
8 Commits
prototype-
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2fc7ae5e8 | ||
|
|
b817f58991 | ||
|
|
c885171029 | ||
|
|
3870a374af | ||
|
|
d41716d8f0 | ||
|
|
43a6505435 | ||
|
|
467e15d9c0 | ||
|
|
91275adb76 |
24
Cargo.lock
generated
24
Cargo.lock
generated
@@ -453,8 +453,9 @@ checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50"
|
||||
|
||||
[[package]]
|
||||
name = "arroy"
|
||||
version = "0.6.4-nested-rtxns"
|
||||
source = "git+https://github.com/meilisearch/arroy?branch=use-heed-nested-rtxns#61c8f4f0addeff968e80438018d0aee2c1eb8d67"
|
||||
version = "0.6.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8578a72223dfa13dfd9fc144d15260d134361789ebdea9b16e85a511edc73c7d"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
@@ -1074,8 +1075,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cellulite"
|
||||
version = "0.3.1-nested-rtxns"
|
||||
source = "git+https://github.com/meilisearch/cellulite?branch=use-heed-nested-rtxns#9fb1866cc49277d26f606769112fa704944ccc61"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "71a41aa2cd021bc3f23d97cc1e645848ca8c279fc757d1570ba7fe7ddc021290"
|
||||
dependencies = [
|
||||
"crossbeam",
|
||||
"geo",
|
||||
@@ -2756,8 +2758,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "hannoy"
|
||||
version = "0.0.9-nested-rtxns"
|
||||
source = "git+https://github.com/nnethercott/hannoy?branch=use-heed-nested-rtxns#d4ca5454eff6539e9fc2119f07113abebbda0a39"
|
||||
version = "0.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0dba13a271c49a119a97862ebf0a74131d879832868400d9fcd937b790058fdd"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
@@ -2835,9 +2838,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.22.1-nested-rtxns"
|
||||
version = "0.22.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0ff115ba5712b1f1fc7617b195f5c2f139e29c397ff79da040cd19db75ccc240"
|
||||
checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393"
|
||||
dependencies = [
|
||||
"bitflags 2.9.4",
|
||||
"byteorder",
|
||||
@@ -2847,6 +2850,7 @@ dependencies = [
|
||||
"lmdb-master-sys",
|
||||
"once_cell",
|
||||
"page_size",
|
||||
"serde",
|
||||
"synchronoise",
|
||||
"url",
|
||||
]
|
||||
@@ -3885,9 +3889,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
|
||||
|
||||
[[package]]
|
||||
name = "lmdb-master-sys"
|
||||
version = "0.2.6-nested-rtxns"
|
||||
version = "0.2.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f4ff85130e3c994b36877045fbbb138d521dea7197bfc19dc3d5d95101a8e20a"
|
||||
checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"doxygen-rs",
|
||||
|
||||
@@ -33,6 +33,10 @@ impl FileStore {
|
||||
std::fs::create_dir_all(&path)?;
|
||||
Ok(FileStore { path })
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
}
|
||||
|
||||
impl FileStore {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
use std::{fs, thread};
|
||||
@@ -591,4 +591,8 @@ impl IndexMapper {
|
||||
pub fn set_currently_updating_index(&self, index: Option<(String, Index)>) {
|
||||
*self.currently_updating_index.write().unwrap() = index;
|
||||
}
|
||||
|
||||
pub fn base_path(&self) -> &Path {
|
||||
&self.base_path
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,6 +50,11 @@ impl MustStopProcessing {
|
||||
pub fn reset(&self) {
|
||||
self.0.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn as_lambda(&self) -> impl Fn() -> bool + Send + Sync + 'static {
|
||||
let clone = self.clone();
|
||||
move || clone.get()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Scheduler {
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::sync::atomic::Ordering;
|
||||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::InternalError;
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
|
||||
@@ -76,6 +77,22 @@ unsafe fn remove_tasks(
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(super) fn process_snapshot(
|
||||
&self,
|
||||
progress: Progress,
|
||||
tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
|
||||
CompactionOption::Disabled
|
||||
} else {
|
||||
CompactionOption::Enabled
|
||||
};
|
||||
match compaction_option {
|
||||
CompactionOption::Enabled => self.process_snapshot_with_temp(progress, tasks),
|
||||
CompactionOption::Disabled => self.process_snapshot_with_pipe(progress, tasks),
|
||||
}
|
||||
}
|
||||
|
||||
fn process_snapshot_with_temp(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
@@ -105,12 +122,8 @@ impl IndexScheduler {
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let dst = temp_snapshot_dir.path().join("tasks");
|
||||
fs::create_dir_all(&dst)?;
|
||||
let compaction_option = if self.scheduler.experimental_no_snapshot_compaction {
|
||||
CompactionOption::Disabled
|
||||
} else {
|
||||
CompactionOption::Enabled
|
||||
};
|
||||
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
||||
|
||||
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 2.2 Remove the current snapshot tasks
|
||||
//
|
||||
@@ -161,7 +174,7 @@ impl IndexScheduler {
|
||||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
fs::create_dir_all(&dst)?;
|
||||
index
|
||||
.copy_to_path(dst.join("data.mdb"), compaction_option)
|
||||
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
|
||||
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||
}
|
||||
|
||||
@@ -171,7 +184,7 @@ impl IndexScheduler {
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let dst = temp_snapshot_dir.path().join("auth");
|
||||
fs::create_dir_all(&dst)?;
|
||||
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
|
||||
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 5. Copy and tarball the flat snapshot
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
@@ -206,4 +219,139 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
fn process_snapshot_with_pipe(
|
||||
&self,
|
||||
progress: Progress,
|
||||
mut tasks: Vec<Task>,
|
||||
) -> Result<Vec<Task>> {
|
||||
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
|
||||
let must_stop_processing = &self.scheduler.must_stop_processing;
|
||||
let abort_no_index = Err(Error::from_milli(InternalError::AbortedIndexation.into(), None));
|
||||
|
||||
fs::create_dir_all(&self.scheduler.snapshots_path)?;
|
||||
|
||||
// 1. Find the base path and original name of the database
|
||||
|
||||
// TODO find a better way to get this path
|
||||
let mut base_path = self.env.path().to_owned();
|
||||
base_path.pop();
|
||||
let base_path = base_path;
|
||||
let db_name = base_path.file_name().and_then(OsStr::to_str).unwrap_or("data.ms");
|
||||
|
||||
// 2. Start the tarball builder. The tarball will be created on another thread from piped data.
|
||||
|
||||
let mut builder = compression::PipedArchiveBuilder::new(
|
||||
self.scheduler.snapshots_path.clone(),
|
||||
base_path.clone(),
|
||||
must_stop_processing.as_lambda(),
|
||||
);
|
||||
|
||||
// 3. Snapshot the VERSION file
|
||||
builder.add_file_to_archive(self.scheduler.version_file_path.clone())?;
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 4. Snapshot the index-scheduler LMDB env
|
||||
//
|
||||
// When we call copy_to_path, LMDB opens a read transaction by itself,
|
||||
// we can't provide our own. It is an issue as we would like to know
|
||||
// the update files to copy but new ones can be enqueued between the copy
|
||||
// of the env and the new transaction we open to retrieve the enqueued tasks.
|
||||
// So we prefer opening a new transaction after copying the env and copy more
|
||||
// update files than not enough.
|
||||
//
|
||||
// Note that there cannot be any update files deleted between those
|
||||
// two read operations as the task processing is synchronous.
|
||||
|
||||
// 4.1 First copy the LMDB env of the index-scheduler
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
builder.add_env_to_archive(&self.env)?;
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 4.2 Create a read transaction on the index-scheduler
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
// 4.3 Only copy the update files of the enqueued tasks
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
|
||||
builder.add_dir_to_archive(self.queue.file_store.path().to_path_buf())?;
|
||||
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
|
||||
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);
|
||||
progress.update_progress(update_file_progress);
|
||||
for task_id in enqueued {
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
let task =
|
||||
self.queue.tasks.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
if let Some(content_uuid) = task.content_uuid() {
|
||||
let src = self.queue.file_store.get_update_path(content_uuid);
|
||||
builder.add_file_to_archive(src)?;
|
||||
}
|
||||
atomic.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// 5. Snapshot every index
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
|
||||
builder.add_dir_to_archive(self.index_mapper.base_path().to_path_buf())?;
|
||||
let index_mapping = self.index_mapper.index_mapping;
|
||||
let nb_indexes = index_mapping.len(&rtxn)? as u32;
|
||||
|
||||
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
|
||||
let (name, _) = result?;
|
||||
let abort_index = || {
|
||||
Err(Error::from_milli(
|
||||
InternalError::AbortedIndexation.into(),
|
||||
Some(name.to_string()), // defer the `to_string`
|
||||
))
|
||||
};
|
||||
|
||||
if must_stop_processing.get() {
|
||||
return abort_index();
|
||||
}
|
||||
|
||||
progress.update_progress(VariableNameStep::<SnapshotCreationProgress>::new(
|
||||
name, i as u32, nb_indexes,
|
||||
));
|
||||
let index = self.index_mapper.index(&rtxn, name)?;
|
||||
builder.add_env_to_archive(index.raw_env())?;
|
||||
}
|
||||
|
||||
drop(rtxn);
|
||||
|
||||
if must_stop_processing.get() {
|
||||
return abort_no_index;
|
||||
}
|
||||
|
||||
// 6. Snapshot the auth LMDB env
|
||||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
builder.add_env_to_archive(&self.scheduler.auth_env)?;
|
||||
|
||||
// 7. Finalize the tarball
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
let file =
|
||||
builder.finish(&self.scheduler.snapshots_path.join(format!("{db_name}.snapshot")))?;
|
||||
|
||||
// 8. Change the permission to make the snapshot readonly
|
||||
let mut permissions = file.metadata()?.permissions();
|
||||
permissions.set_readonly(true);
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
#[allow(clippy::non_octal_unix_permissions)]
|
||||
// rwxrwxrwx
|
||||
permissions.set_mode(0b100100100);
|
||||
}
|
||||
|
||||
file.set_permissions(permissions)?;
|
||||
|
||||
for task in &mut tasks {
|
||||
task.status = Status::Succeeded;
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::io::{PipeWriter, Read, Write};
|
||||
use std::mem::ManuallyDrop;
|
||||
use std::ops::DerefMut;
|
||||
use std::os::fd::{AsRawFd, FromRawFd};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
|
||||
use std::thread::JoinHandle;
|
||||
|
||||
use flate2::read::GzDecoder;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use tar::{Archive, Builder};
|
||||
use milli::heed::Env;
|
||||
use tar::{Archive, Builder, Header};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let mut f = File::create(dest)?;
|
||||
@@ -26,3 +33,222 @@ pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Res
|
||||
ar.unpack(&dest)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub struct PipedArchiveBuilder {
|
||||
send_compression: Sender<CompressionMessage>,
|
||||
send_cancellation: Sender<CancellationMessage>,
|
||||
processing_thread: JoinHandle<anyhow::Result<NamedTempFile>>,
|
||||
cancellation_thread: JoinHandle<()>,
|
||||
}
|
||||
|
||||
enum CompressionMessage {
|
||||
Env { path: PathBuf, reader: std::io::PipeReader },
|
||||
File { path: PathBuf },
|
||||
Dir { path: PathBuf },
|
||||
}
|
||||
|
||||
impl PipedArchiveBuilder {
|
||||
pub fn new<F>(dest_dir: PathBuf, base_path: PathBuf, must_stop_processing: F) -> Self
|
||||
where
|
||||
F: Fn() -> bool + Send + 'static,
|
||||
{
|
||||
let (send_compression, recv) = std::sync::mpsc::channel();
|
||||
let processing_thread = std::thread::Builder::new()
|
||||
.name("piped-archive-builder".into())
|
||||
.spawn(|| Self::run_processing(dest_dir, recv, base_path))
|
||||
.unwrap();
|
||||
|
||||
let (send_cancellation, recv) = std::sync::mpsc::channel();
|
||||
|
||||
let cancellation_thread = std::thread::Builder::new()
|
||||
.name("piped-archive-builder-cancellation".into())
|
||||
.spawn(|| Self::run_cancellation(must_stop_processing, recv))
|
||||
.unwrap();
|
||||
|
||||
Self { send_compression, send_cancellation, processing_thread, cancellation_thread }
|
||||
}
|
||||
|
||||
/// Add a heed environment to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Errors originating with that thread:
|
||||
/// - Heed errors, if taking a write transaction fails
|
||||
/// - If the copy of the environment fails.
|
||||
/// - If there is an I/O error opening the database at the environment's path.
|
||||
/// - Errors originating with another thread:
|
||||
/// - If the cancellation thread panicked or otherwise dropped its receiver.
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_env_to_archive<T>(&mut self, env: &Env<T>) -> anyhow::Result<()> {
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let path = env.path().to_path_buf();
|
||||
// make sure that the environment cannot change while it is being added to the archive,
|
||||
// as any concurrent change would corrupt the copy.
|
||||
let env_wtxn = env.write_txn()?;
|
||||
|
||||
// SAFETY: only the cancellation thread has the actual responsibility of closing the pipe since
|
||||
// the clone is `ManuallyDrop`.
|
||||
let mut cloned_writer = unsafe {
|
||||
let writer_raw_fd = writer.as_raw_fd();
|
||||
ManuallyDrop::new(PipeWriter::from_raw_fd(writer_raw_fd))
|
||||
};
|
||||
|
||||
self.send_cancellation.send(CancellationMessage::OpenedPipe { pipe: writer });
|
||||
|
||||
self.send_compression.send(CompressionMessage::Env { path, reader });
|
||||
|
||||
let mdb_path = env.path().join("data.mdb");
|
||||
let mut file = std::fs::File::open(&mdb_path)?;
|
||||
let mut file = std::io::BufReader::with_capacity(16 * 4096, &mut file);
|
||||
std::io::copy(&mut file, cloned_writer.deref_mut())?;
|
||||
|
||||
self.send_cancellation.send(CancellationMessage::ClosingPipe);
|
||||
|
||||
// no change we might want to commit
|
||||
env_wtxn.abort();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a file to the archive
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_file_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::File { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Add a directory name (**without its contents**) to the archive.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - If the processing thread panicked or otherwise dropped its receiver.
|
||||
pub fn add_dir_to_archive(&mut self, path: PathBuf) -> anyhow::Result<()> {
|
||||
self.send_compression.send(CompressionMessage::Dir { path });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Finalize the archive and persists it to disk.
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// - Originating with the current thread:
|
||||
/// - If persisting the archive fails
|
||||
/// - Originating with another thread:
|
||||
/// - If the cancellation thread panicked.
|
||||
/// - If the processing thread panicked or otherwise terminated in error.
|
||||
pub fn finish(self, dest_path: &Path) -> anyhow::Result<File> {
|
||||
drop(self.send_cancellation);
|
||||
drop(self.send_compression);
|
||||
/// FIXME catch panics
|
||||
let temp_archive = self.processing_thread.join().unwrap()?;
|
||||
self.cancellation_thread.join().unwrap();
|
||||
let archive = temp_archive.persist(dest_path)?;
|
||||
Ok(archive)
|
||||
}
|
||||
|
||||
fn run_processing(
|
||||
dest_dir: PathBuf,
|
||||
recv: Receiver<CompressionMessage>,
|
||||
base_path: PathBuf,
|
||||
) -> anyhow::Result<NamedTempFile> {
|
||||
let mut temp_archive = tempfile::NamedTempFile::new_in(&dest_dir)?;
|
||||
|
||||
let gz_encoder = GzEncoder::new(&mut temp_archive, Compression::default());
|
||||
let mut tar_encoder = Builder::new(gz_encoder);
|
||||
let base_path_in_archive = PathInArchive::from_absolute_and_base(&base_path, &base_path);
|
||||
// add the root
|
||||
tar_encoder.append_dir(base_path_in_archive.as_path(), &base_path)?;
|
||||
while let Ok(message) = recv.recv() {
|
||||
match message {
|
||||
CompressionMessage::Env { path, reader } => {
|
||||
let dir_path_in_archive =
|
||||
PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
|
||||
tar_encoder.append_dir(dir_path_in_archive.as_path(), &path)?;
|
||||
|
||||
let path = path.join("data.mdb");
|
||||
Self::add_to_archive(&mut tar_encoder, &path, &base_path, reader)?;
|
||||
}
|
||||
CompressionMessage::File { path } => {
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
tar_encoder.append_path_with_name(&path, path_in_archive.as_path())?;
|
||||
}
|
||||
CompressionMessage::Dir { path } => {
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(&path, &base_path);
|
||||
|
||||
tar_encoder.append_dir(path_in_archive.as_path(), &path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let gz_encoder = tar_encoder.into_inner()?;
|
||||
gz_encoder.finish()?;
|
||||
temp_archive.flush()?;
|
||||
Ok(temp_archive)
|
||||
}
|
||||
|
||||
fn run_cancellation<F>(must_stop_processing: F, recv: Receiver<CancellationMessage>)
|
||||
where
|
||||
F: Fn() -> bool + Send + 'static,
|
||||
{
|
||||
let mut current_pipe = None;
|
||||
loop {
|
||||
let next_message = match recv.recv_timeout(std::time::Duration::from_secs(60)) {
|
||||
Ok(message) => message,
|
||||
Err(RecvTimeoutError::Disconnected) => break,
|
||||
Err(RecvTimeoutError::Timeout) => {
|
||||
if must_stop_processing() {
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match next_message {
|
||||
CancellationMessage::OpenedPipe { pipe } => current_pipe = Some(pipe),
|
||||
CancellationMessage::ClosingPipe => current_pipe = None,
|
||||
}
|
||||
}
|
||||
drop(current_pipe);
|
||||
}
|
||||
|
||||
fn add_to_archive(
|
||||
tar_encoder: &mut Builder<impl Write>,
|
||||
path: &Path,
|
||||
base: &Path,
|
||||
reader: impl Read,
|
||||
) -> anyhow::Result<()> {
|
||||
let stats = path.metadata()?;
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_metadata_in_mode(&stats, tar::HeaderMode::Complete);
|
||||
let path_in_archive = PathInArchive::from_absolute_and_base(path, base);
|
||||
|
||||
tar_encoder.append_data(&mut header, path_in_archive.as_path(), reader)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
enum CancellationMessage {
|
||||
OpenedPipe { pipe: PipeWriter },
|
||||
ClosingPipe,
|
||||
}
|
||||
|
||||
struct PathInArchive(PathBuf);
|
||||
|
||||
impl PathInArchive {
|
||||
pub fn from_absolute_and_base(absolute: &Path, base: &Path) -> Self {
|
||||
/// FIXME
|
||||
let canonical = absolute.canonicalize().unwrap();
|
||||
let relative = match canonical.strip_prefix(base) {
|
||||
Ok(stripped) => Path::new(&".").join(stripped),
|
||||
Err(_) => absolute.to_path_buf(),
|
||||
};
|
||||
|
||||
Self(relative)
|
||||
}
|
||||
|
||||
pub fn as_path(&self) -> &Path {
|
||||
self.0.as_path()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ bstr = "1.12.0"
|
||||
bytemuck = { version = "1.23.1", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.5.0"
|
||||
charabia = { version = "0.9.7", default-features = false }
|
||||
cellulite = { version = "0.3.1-nested-rtxns", git = "https://github.com/meilisearch/cellulite", "branch" = "use-heed-nested-rtxns" }
|
||||
cellulite = "0.3.0"
|
||||
concat-arrays = "0.1.2"
|
||||
convert_case = "0.8.0"
|
||||
crossbeam-channel = "0.5.15"
|
||||
@@ -34,7 +34,7 @@ grenad = { version = "0.5.0", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
heed = { version = "0.22.1-nested-rtxns", default-features = false, features = [
|
||||
heed = { version = "0.22.0", default-features = false, features = [
|
||||
"serde-json",
|
||||
"serde-bincode",
|
||||
] }
|
||||
@@ -89,8 +89,8 @@ rhai = { version = "1.22.2", features = [
|
||||
"no_time",
|
||||
"sync",
|
||||
] }
|
||||
arroy = { version = "0.6.4-nested-rtxns", git = "https://github.com/meilisearch/arroy", "branch" = "use-heed-nested-rtxns" }
|
||||
hannoy = { version = "0.0.9-nested-rtxns", git = "https://github.com/nnethercott/hannoy", "branch" = "use-heed-nested-rtxns", features = ["arroy"] }
|
||||
arroy = "0.6.3"
|
||||
hannoy = { version = "0.0.8", features = ["arroy"] }
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.41"
|
||||
ureq = { version = "2.12.1", features = ["json"] }
|
||||
|
||||
@@ -1983,6 +1983,11 @@ impl Index {
|
||||
|
||||
Ok(sizes)
|
||||
}
|
||||
|
||||
/// The underlying env for raw access
|
||||
pub fn raw_env(&self) -> &heed::Env<WithoutTls> {
|
||||
&self.env
|
||||
}
|
||||
}
|
||||
|
||||
pub struct EmbeddingsWithMetadata {
|
||||
|
||||
@@ -180,15 +180,12 @@ where
|
||||
})
|
||||
.unwrap()?;
|
||||
|
||||
pool.install(|| {
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)
|
||||
})
|
||||
.unwrap()?;
|
||||
post_processing::post_process(
|
||||
indexing_context,
|
||||
wtxn,
|
||||
global_fields_ids_map,
|
||||
facet_field_ids_delta,
|
||||
)?;
|
||||
|
||||
indexing_context.progress.update_progress(IndexingStep::BuildingGeoJson);
|
||||
index.cellulite.build(
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::cmp::Ordering;
|
||||
|
||||
use facet_bulk::generate_facet_levels;
|
||||
use heed::types::{Bytes, DecodeIgnore, Str};
|
||||
use heed::RwTxn;
|
||||
use itertools::{merge_join_by, EitherOrBoth};
|
||||
@@ -24,8 +23,6 @@ use crate::update::new::FacetFieldIdsDelta;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::{GlobalFieldsIdsMap, Index, Result};
|
||||
|
||||
mod facet_bulk;
|
||||
|
||||
pub(super) fn post_process<MSP>(
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
@@ -242,8 +239,9 @@ fn compute_facet_level_database(
|
||||
match delta {
|
||||
FacetFieldIdDelta::Bulk => {
|
||||
progress.update_progress(PostProcessingFacets::StringsBulk);
|
||||
tracing::debug!(%fid, "bulk string facet processing in parallel");
|
||||
generate_facet_levels(index, wtxn, fid, FacetType::String)?
|
||||
tracing::debug!(%fid, "bulk string facet processing");
|
||||
FacetsUpdateBulk::new_not_updating_level_0(index, vec![fid], FacetType::String)
|
||||
.execute(wtxn)?
|
||||
}
|
||||
FacetFieldIdDelta::Incremental(delta_data) => {
|
||||
progress.update_progress(PostProcessingFacets::StringsIncremental);
|
||||
@@ -1,164 +0,0 @@
|
||||
use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
use std::{iter, mem};
|
||||
|
||||
use grenad::CompressionType;
|
||||
use heed::types::{Bytes, LazyDecode};
|
||||
use heed::{Database, RwTxn};
|
||||
use rayon::prelude::*;
|
||||
use roaring::MultiOps;
|
||||
use tempfile::tempfile;
|
||||
|
||||
use crate::facet::FacetType;
|
||||
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
|
||||
use crate::heed_codec::BytesRefCodec;
|
||||
use crate::update::facet::{FACET_GROUP_SIZE, FACET_MIN_LEVEL_SIZE};
|
||||
use crate::update::{create_writer, writer_into_reader};
|
||||
use crate::{CboRoaringBitmapCodec, FieldId, Index};
|
||||
|
||||
/// Generate the facet level based on the level 0.
|
||||
///
|
||||
/// The function will generate all the group levels from
|
||||
/// the group 1 to the level n until the number of group
|
||||
/// is smaller than the minimum required size.
|
||||
pub fn generate_facet_levels(
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
field_id: FieldId,
|
||||
facet_type: FacetType,
|
||||
) -> crate::Result<()> {
|
||||
let db = match facet_type {
|
||||
FacetType::String => index
|
||||
.facet_id_string_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
FacetType::Number => index
|
||||
.facet_id_f64_docids
|
||||
.remap_key_type::<FacetGroupKeyCodec<BytesRefCodec>>()
|
||||
.lazily_decode_data(),
|
||||
};
|
||||
|
||||
clear_levels(db, wtxn, field_id)?;
|
||||
|
||||
let mut base_level = 0;
|
||||
// That's a do-while loop
|
||||
while {
|
||||
let mut level_size = 0;
|
||||
for reader in compute_level(index, wtxn, db, field_id, base_level)? {
|
||||
let mut cursor = reader.into_cursor()?;
|
||||
while let Some((left_bound, facet_group_value)) = cursor.move_on_next()? {
|
||||
level_size += 1;
|
||||
let level = base_level.checked_add(1).unwrap();
|
||||
let key = FacetGroupKey { field_id, level, left_bound };
|
||||
debug_assert!(
|
||||
db.get(wtxn, &key).transpose().is_none(),
|
||||
"entry must not be there and must have already been deleted: {key:?}"
|
||||
);
|
||||
db.remap_data_type::<Bytes>().put(wtxn, &key, facet_group_value)?;
|
||||
}
|
||||
}
|
||||
|
||||
base_level += 1;
|
||||
|
||||
// If the next level will have the minimum required groups, continue.
|
||||
(level_size / FACET_GROUP_SIZE as usize) >= FACET_MIN_LEVEL_SIZE as usize
|
||||
} {}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Compute the groups of facets from the provided base level
|
||||
/// and write the content into different grenad files.
|
||||
fn compute_level(
|
||||
index: &Index,
|
||||
wtxn: &heed::RwTxn,
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
field_id: FieldId,
|
||||
base_level: u8,
|
||||
) -> Result<Vec<grenad::Reader<BufReader<File>>>, crate::Error> {
|
||||
let thread_count = rayon::current_num_threads();
|
||||
let rtxns = iter::repeat_with(|| index.env.nested_read_txn(wtxn))
|
||||
.take(thread_count)
|
||||
.collect::<heed::Result<Vec<_>>>()?;
|
||||
|
||||
let range = {
|
||||
// Based on the first possible value for the base level up to
|
||||
// the first possible value for the next level *excluded*.
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: base_level, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> {
|
||||
field_id,
|
||||
level: base_level.checked_add(1).unwrap(),
|
||||
left_bound: &[],
|
||||
};
|
||||
left..right
|
||||
};
|
||||
|
||||
rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
let mut writer = tempfile().map(|f| create_writer(CompressionType::None, None, f))?;
|
||||
|
||||
let mut left_bound = None;
|
||||
let mut group_docids = Vec::new();
|
||||
let mut ser_buffer = Vec::new();
|
||||
for (i, result) in db.range(&rtxn, &range)?.enumerate() {
|
||||
let (key, lazy_value) = result?;
|
||||
|
||||
let start_of_group = i % FACET_GROUP_SIZE as usize == 0;
|
||||
let group_index = i / FACET_GROUP_SIZE as usize;
|
||||
let group_for_thread = group_index % thread_count == thread_id;
|
||||
|
||||
if group_for_thread {
|
||||
if start_of_group {
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
// We store the bitmaps in a Vec this way we can use
|
||||
// the MultiOps operations that tends to be more efficient
|
||||
// for unions. The Vec is empty after the operation.
|
||||
//
|
||||
// We also don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
ser_buffer.clear();
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = mem::take(&mut group_docids);
|
||||
let docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
left_bound = Some(key.left_bound);
|
||||
}
|
||||
|
||||
// Lazily decode the bitmaps we are interested in.
|
||||
let value = lazy_value.decode().map_err(heed::Error::Decoding)?;
|
||||
group_docids.push(value.bitmap);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(left_bound) = left_bound.take() {
|
||||
ser_buffer.clear();
|
||||
// We don't forget to store the group size corresponding
|
||||
// to the number of entries merged in this group.
|
||||
let group_len: u8 = group_docids.len().try_into().unwrap();
|
||||
ser_buffer.push(group_len);
|
||||
let group_docids = group_docids.into_iter().union();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&group_docids, &mut ser_buffer);
|
||||
writer.insert(left_bound, &ser_buffer)?;
|
||||
}
|
||||
|
||||
writer_into_reader(writer)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Clears all the levels and only keeps the level 0 of the specified field id.
|
||||
fn clear_levels(
|
||||
db: Database<FacetGroupKeyCodec<BytesRefCodec>, LazyDecode<FacetGroupValueCodec>>,
|
||||
wtxn: &mut RwTxn<'_>,
|
||||
field_id: FieldId,
|
||||
) -> heed::Result<()> {
|
||||
let left = FacetGroupKey::<&[u8]> { field_id, level: 1, left_bound: &[] };
|
||||
let right = FacetGroupKey::<&[u8]> { field_id, level: u8::MAX, left_bound: &[] };
|
||||
let range = left..=right;
|
||||
db.delete_range(wtxn, &range).map(drop)
|
||||
}
|
||||
@@ -1,12 +1,11 @@
|
||||
use std::cell::RefCell;
|
||||
use std::collections::BTreeSet;
|
||||
use std::io::{BufReader, BufWriter, Read, Seek, Write};
|
||||
use std::iter;
|
||||
|
||||
use hashbrown::HashMap;
|
||||
use heed::types::Bytes;
|
||||
use heed::{BytesDecode, Database, Error, RoTxn, RwTxn};
|
||||
use rayon::iter::{IndexedParallelIterator as _, IntoParallelIterator, ParallelIterator as _};
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator as _};
|
||||
use roaring::MultiOps;
|
||||
use tempfile::spooled_tempfile;
|
||||
use thread_local::ThreadLocal;
|
||||
@@ -152,35 +151,25 @@ impl<'a, 'rtxn> FrozenPrefixBitmaps<'a, 'rtxn> {
|
||||
|
||||
unsafe impl Sync for FrozenPrefixBitmaps<'_, '_> {}
|
||||
|
||||
struct WordPrefixIntegerDocids<'i> {
|
||||
index: &'i Index,
|
||||
struct WordPrefixIntegerDocids {
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
max_memory_by_thread: Option<usize>,
|
||||
read_uncommitted_in_parallel: bool,
|
||||
}
|
||||
|
||||
impl<'i> WordPrefixIntegerDocids<'i> {
|
||||
impl WordPrefixIntegerDocids {
|
||||
fn new(
|
||||
index: &'i Index,
|
||||
database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
prefix_database: Database<Bytes, CboRoaringBitmapCodec>,
|
||||
grenad_parameters: &'_ GrenadParameters,
|
||||
) -> WordPrefixIntegerDocids<'i> {
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> WordPrefixIntegerDocids {
|
||||
WordPrefixIntegerDocids {
|
||||
index,
|
||||
database,
|
||||
prefix_database,
|
||||
max_memory_by_thread: grenad_parameters.max_memory_by_thread(),
|
||||
read_uncommitted_in_parallel: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Use an experimental LMDB feature to read uncommitted data in parallel.
|
||||
fn read_uncommitted_in_parallel(&mut self, value: bool) {
|
||||
self.read_uncommitted_in_parallel = value;
|
||||
}
|
||||
|
||||
fn execute(
|
||||
self,
|
||||
wtxn: &mut heed::RwTxn,
|
||||
@@ -188,144 +177,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
) -> Result<()> {
|
||||
delete_prefixes(wtxn, &self.prefix_database, prefix_to_delete)?;
|
||||
if self.read_uncommitted_in_parallel {
|
||||
self.recompute_modified_prefixes_no_frozen(wtxn, prefix_to_compute)
|
||||
} else {
|
||||
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
|
||||
}
|
||||
}
|
||||
|
||||
/// Computes the same as `recompute_modified_prefixes`.
|
||||
///
|
||||
/// ...but without aggregating the prefixes mmap pointers into a static HashMap
|
||||
/// beforehand and rather use an experimental LMDB feature to read the subset
|
||||
/// of prefixes in parallel from the uncommitted transaction.
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
fn recompute_modified_prefixes_no_frozen(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
prefixes: &BTreeSet<Prefix>,
|
||||
) -> Result<()> {
|
||||
let thread_count = rayon::current_num_threads();
|
||||
let rtxns = iter::repeat_with(|| self.index.env.nested_read_txn(wtxn))
|
||||
.take(thread_count)
|
||||
.collect::<heed::Result<Vec<_>>>()?;
|
||||
|
||||
let outputs = rtxns
|
||||
.into_par_iter()
|
||||
.enumerate()
|
||||
.map(|(thread_id, rtxn)| {
|
||||
// `indexes` represent offsets at which prefixes computations were stored in the `file`.
|
||||
let mut indexes = Vec::new();
|
||||
let mut file = BufWriter::new(spooled_tempfile(
|
||||
self.max_memory_by_thread.unwrap_or(usize::MAX),
|
||||
));
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
for (prefix_index, prefix) in prefixes.iter().enumerate() {
|
||||
// Is prefix for another thread?
|
||||
if prefix_index % thread_count != thread_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut bitmaps_bytes = Vec::<&[u8]>::new();
|
||||
let mut prev_pos = None;
|
||||
for result in self
|
||||
.database
|
||||
.remap_data_type::<Bytes>()
|
||||
.prefix_iter(&rtxn, prefix.as_bytes())?
|
||||
{
|
||||
let (key, current_bitmap_bytes) = result?;
|
||||
let (_word, pos) =
|
||||
StrBEU16Codec::bytes_decode(key).map_err(Error::Decoding)?;
|
||||
|
||||
if prev_pos.is_some_and(|p| p != pos) {
|
||||
if bitmaps_bytes.is_empty() {
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: None,
|
||||
});
|
||||
} else {
|
||||
let output = bitmaps_bytes
|
||||
.iter()
|
||||
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
|
||||
.union()?;
|
||||
buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: Some(buffer.len()),
|
||||
});
|
||||
file.write_all(&buffer)?;
|
||||
bitmaps_bytes.clear();
|
||||
}
|
||||
}
|
||||
|
||||
bitmaps_bytes.push(current_bitmap_bytes);
|
||||
prev_pos = Some(pos);
|
||||
}
|
||||
|
||||
if let Some(pos) = prev_pos {
|
||||
if bitmaps_bytes.is_empty() {
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: None,
|
||||
});
|
||||
} else {
|
||||
let output = bitmaps_bytes
|
||||
.iter()
|
||||
.map(|bytes| CboRoaringBitmapCodec::deserialize_from(bytes))
|
||||
.union()?;
|
||||
buffer.clear();
|
||||
CboRoaringBitmapCodec::serialize_into_vec(&output, &mut buffer);
|
||||
indexes.push(PrefixIntegerEntry {
|
||||
prefix,
|
||||
pos,
|
||||
serialized_length: Some(buffer.len()),
|
||||
});
|
||||
file.write_all(&buffer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((indexes, file))
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// We iterate over all the collected and serialized bitmaps through
|
||||
// the files and entries to eventually put them in the final database.
|
||||
let mut key_buffer = Vec::new();
|
||||
let mut buffer = Vec::new();
|
||||
for (index, file) in outputs {
|
||||
let mut file = file.into_inner().map_err(|e| e.into_error())?;
|
||||
file.rewind()?;
|
||||
let mut file = BufReader::new(file);
|
||||
for PrefixIntegerEntry { prefix, pos, serialized_length } in index {
|
||||
key_buffer.clear();
|
||||
key_buffer.extend_from_slice(prefix.as_bytes());
|
||||
key_buffer.push(0);
|
||||
key_buffer.extend_from_slice(&pos.to_be_bytes());
|
||||
match serialized_length {
|
||||
Some(serialized_length) => {
|
||||
buffer.resize(serialized_length, 0);
|
||||
file.read_exact(&mut buffer)?;
|
||||
self.prefix_database.remap_data_type::<Bytes>().put(
|
||||
wtxn,
|
||||
&key_buffer,
|
||||
&buffer,
|
||||
)?;
|
||||
}
|
||||
None => {
|
||||
self.prefix_database.delete(wtxn, &key_buffer)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
self.recompute_modified_prefixes(wtxn, prefix_to_compute)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
@@ -410,7 +262,7 @@ impl<'i> WordPrefixIntegerDocids<'i> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents a prefix and the length the bitmap takes on disk.
|
||||
/// Represents a prefix and the lenght the bitmap takes on disk.
|
||||
struct PrefixIntegerEntry<'a> {
|
||||
prefix: &'a str,
|
||||
pos: u16,
|
||||
@@ -510,14 +362,12 @@ pub fn compute_word_prefix_fid_docids(
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> Result<()> {
|
||||
let mut builder = WordPrefixIntegerDocids::new(
|
||||
index,
|
||||
WordPrefixIntegerDocids::new(
|
||||
index.word_fid_docids.remap_key_type(),
|
||||
index.word_prefix_fid_docids.remap_key_type(),
|
||||
grenad_parameters,
|
||||
);
|
||||
builder.read_uncommitted_in_parallel(true);
|
||||
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
|
||||
)
|
||||
.execute(wtxn, prefix_to_compute, prefix_to_delete)
|
||||
}
|
||||
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
|
||||
@@ -528,12 +378,10 @@ pub fn compute_word_prefix_position_docids(
|
||||
prefix_to_delete: &BTreeSet<Prefix>,
|
||||
grenad_parameters: &GrenadParameters,
|
||||
) -> Result<()> {
|
||||
let mut builder = WordPrefixIntegerDocids::new(
|
||||
index,
|
||||
WordPrefixIntegerDocids::new(
|
||||
index.word_position_docids.remap_key_type(),
|
||||
index.word_prefix_position_docids.remap_key_type(),
|
||||
grenad_parameters,
|
||||
);
|
||||
builder.read_uncommitted_in_parallel(true);
|
||||
builder.execute(wtxn, prefix_to_compute, prefix_to_delete)
|
||||
)
|
||||
.execute(wtxn, prefix_to_compute, prefix_to_delete)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user