mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-11-28 00:40:31 +00:00
Compare commits
14 Commits
release-v1
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60bfd3aef1 | ||
|
|
5027eea1a8 | ||
|
|
5079fb4b14 | ||
|
|
8e016fbfeb | ||
|
|
1ccde9bf0b | ||
|
|
34e814f400 | ||
|
|
552127021f | ||
|
|
b4d7d80ad9 | ||
|
|
5204c0b60b | ||
|
|
e73cd692db | ||
|
|
29b453346b | ||
|
|
c4bb435374 | ||
|
|
2bcff2ea46 | ||
|
|
1275e72e0b |
38
.github/workflows/sdks-tests.yml
vendored
38
.github/workflows/sdks-tests.yml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
outputs:
|
||||
docker-image: ${{ steps.define-image.outputs.docker-image }}
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
- name: Define the Docker image we need to use
|
||||
id: define-image
|
||||
run: |
|
||||
@@ -46,11 +46,11 @@ jobs:
|
||||
MEILISEARCH_VERSION: ${{ needs.define-docker-image.outputs.docker-image }}
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-dotnet
|
||||
- name: Setup .NET Core
|
||||
uses: actions/setup-dotnet@v3
|
||||
uses: actions/setup-dotnet@v4
|
||||
with:
|
||||
dotnet-version: "6.0.x"
|
||||
- name: Install dependencies
|
||||
@@ -75,12 +75,12 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-dart
|
||||
- uses: dart-lang/setup-dart@v1
|
||||
with:
|
||||
sdk: 3.1.1
|
||||
sdk: 'latest'
|
||||
- name: Install dependencies
|
||||
run: dart pub get
|
||||
- name: Run integration tests
|
||||
@@ -100,10 +100,10 @@ jobs:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
uses: actions/setup-go@v5
|
||||
with:
|
||||
go-version: stable
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-go
|
||||
- name: Get dependencies
|
||||
@@ -129,11 +129,11 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-java
|
||||
- name: Set up Java
|
||||
uses: actions/setup-java@v3
|
||||
uses: actions/setup-java@v4
|
||||
with:
|
||||
java-version: 8
|
||||
distribution: 'zulu'
|
||||
@@ -156,7 +156,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-js
|
||||
- name: Setup node
|
||||
@@ -191,7 +191,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-php
|
||||
- name: Install PHP
|
||||
@@ -220,11 +220,11 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-python
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v4
|
||||
uses: actions/setup-python@v5
|
||||
- name: Install pipenv
|
||||
uses: dschep/install-pipenv-action@v1
|
||||
- name: Install dependencies
|
||||
@@ -245,7 +245,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-ruby
|
||||
- name: Set up Ruby 3
|
||||
@@ -270,7 +270,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-rust
|
||||
- name: Build
|
||||
@@ -291,7 +291,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-swift
|
||||
- name: Run tests
|
||||
@@ -314,7 +314,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-js-plugins
|
||||
- name: Setup node
|
||||
@@ -345,7 +345,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-rails
|
||||
- name: Set up Ruby 3
|
||||
@@ -369,7 +369,7 @@ jobs:
|
||||
ports:
|
||||
- '7700:7700'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
with:
|
||||
repository: meilisearch/meilisearch-symfony
|
||||
- name: Install PHP
|
||||
|
||||
2
LICENSE
2
LICENSE
@@ -1,6 +1,6 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2019-2022 Meili SAS
|
||||
Copyright (c) 2019-2024 Meili SAS
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -42,7 +42,7 @@ Meilisearch helps you shape a delightful search experience in a snap, offering f
|
||||
|
||||
- **Search-as-you-type:** find search results in less than 50 milliseconds
|
||||
- **[Typo tolerance](https://www.meilisearch.com/docs/learn/getting_started/customizing_relevancy?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features#typo-tolerance):** get relevant matches even when queries contain typos and misspellings
|
||||
- **[Filtering](https://www.meilisearch.com/docs/learn/fine_tuning_results/filtering?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features) and [faceted search](https://www.meilisearch.com/docs/learn/fine_tuning_results/faceted_search?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features):** enhance your user's search experience with custom filters and build a faceted search interface in a few lines of code
|
||||
- **[Filtering](https://www.meilisearch.com/docs/learn/fine_tuning_results/filtering?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features) and [faceted search](https://www.meilisearch.com/docs/learn/fine_tuning_results/faceted_search?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features):** enhance your users' search experience with custom filters and build a faceted search interface in a few lines of code
|
||||
- **[Sorting](https://www.meilisearch.com/docs/learn/fine_tuning_results/sorting?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features):** sort results based on price, date, or pretty much anything else your users need
|
||||
- **[Synonym support](https://www.meilisearch.com/docs/learn/getting_started/customizing_relevancy?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features#synonyms):** configure synonyms to include more relevant content in your search results
|
||||
- **[Geosearch](https://www.meilisearch.com/docs/learn/fine_tuning_results/geosearch?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=features):** filter and sort documents based on geographic data
|
||||
|
||||
@@ -60,7 +60,7 @@ pub(crate) enum Batch {
|
||||
/// The list of tasks that were processing when this task cancelation appeared.
|
||||
previous_processing_tasks: RoaringBitmap,
|
||||
},
|
||||
TaskDeletion(Task),
|
||||
TaskDeletions(Vec<Task>),
|
||||
SnapshotCreation(Vec<Task>),
|
||||
Dump(Task),
|
||||
IndexOperation {
|
||||
@@ -146,13 +146,12 @@ impl Batch {
|
||||
pub fn ids(&self) -> Vec<TaskId> {
|
||||
match self {
|
||||
Batch::TaskCancelation { task, .. }
|
||||
| Batch::TaskDeletion(task)
|
||||
| Batch::Dump(task)
|
||||
| Batch::IndexCreation { task, .. }
|
||||
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
||||
Batch::SnapshotCreation(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
||||
tasks.iter().map(|task| task.uid).collect()
|
||||
}
|
||||
Batch::SnapshotCreation(tasks)
|
||||
| Batch::TaskDeletions(tasks)
|
||||
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
|
||||
Batch::IndexOperation { op, .. } => match op {
|
||||
IndexOperation::DocumentOperation { tasks, .. }
|
||||
| IndexOperation::Settings { tasks, .. }
|
||||
@@ -180,7 +179,7 @@ impl Batch {
|
||||
use Batch::*;
|
||||
match self {
|
||||
TaskCancelation { .. }
|
||||
| TaskDeletion(_)
|
||||
| TaskDeletions(_)
|
||||
| SnapshotCreation(_)
|
||||
| Dump(_)
|
||||
| IndexSwap { .. } => None,
|
||||
@@ -199,7 +198,7 @@ impl fmt::Display for Batch {
|
||||
let tasks = self.ids();
|
||||
match self {
|
||||
Batch::TaskCancelation { .. } => f.write_str("TaskCancelation")?,
|
||||
Batch::TaskDeletion(_) => f.write_str("TaskDeletion")?,
|
||||
Batch::TaskDeletions(_) => f.write_str("TaskDeletion")?,
|
||||
Batch::SnapshotCreation(_) => f.write_str("SnapshotCreation")?,
|
||||
Batch::Dump(_) => f.write_str("Dump")?,
|
||||
Batch::IndexOperation { op, .. } => write!(f, "{op}")?,
|
||||
@@ -539,9 +538,9 @@ impl IndexScheduler {
|
||||
|
||||
// 2. we get the next task to delete
|
||||
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
|
||||
if let Some(task_id) = to_delete.min() {
|
||||
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
return Ok(Some(Batch::TaskDeletion(task)));
|
||||
if !to_delete.is_empty() {
|
||||
let tasks = self.get_existing_tasks(rtxn, to_delete)?;
|
||||
return Ok(Some(Batch::TaskDeletions(tasks)));
|
||||
}
|
||||
|
||||
// 3. we batch the snapshot.
|
||||
@@ -681,31 +680,43 @@ impl IndexScheduler {
|
||||
|
||||
Ok(vec![task])
|
||||
}
|
||||
Batch::TaskDeletion(mut task) => {
|
||||
Batch::TaskDeletions(mut tasks) => {
|
||||
// 1. Retrieve the tasks that matched the query at enqueue-time.
|
||||
let matched_tasks =
|
||||
let mut matched_tasks = RoaringBitmap::new();
|
||||
|
||||
for task in tasks.iter() {
|
||||
if let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind {
|
||||
tasks
|
||||
matched_tasks |= tasks;
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let mut deleted_tasks = self.delete_matched_tasks(&mut wtxn, &matched_tasks)?;
|
||||
wtxn.commit()?;
|
||||
|
||||
for task in tasks.iter_mut() {
|
||||
task.status = Status::Succeeded;
|
||||
let KindWithContent::TaskDeletion { tasks, query: _ } = &task.kind else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let deleted_tasks_count = self.delete_matched_tasks(&mut wtxn, matched_tasks)?;
|
||||
let deleted_tasks_count = deleted_tasks.intersection_len(tasks);
|
||||
deleted_tasks -= tasks;
|
||||
|
||||
task.status = Status::Succeeded;
|
||||
match &mut task.details {
|
||||
Some(Details::TaskDeletion {
|
||||
matched_tasks: _,
|
||||
deleted_tasks,
|
||||
original_filter: _,
|
||||
}) => {
|
||||
*deleted_tasks = Some(deleted_tasks_count);
|
||||
match &mut task.details {
|
||||
Some(Details::TaskDeletion {
|
||||
matched_tasks: _,
|
||||
deleted_tasks,
|
||||
original_filter: _,
|
||||
}) => {
|
||||
*deleted_tasks = Some(deleted_tasks_count);
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
wtxn.commit()?;
|
||||
Ok(vec![task])
|
||||
Ok(tasks)
|
||||
}
|
||||
Batch::SnapshotCreation(mut tasks) => {
|
||||
fs::create_dir_all(&self.snapshots_path)?;
|
||||
@@ -1435,7 +1446,11 @@ impl IndexScheduler {
|
||||
/// Delete each given task from all the databases (if it is deleteable).
|
||||
///
|
||||
/// Return the number of tasks that were actually deleted.
|
||||
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
|
||||
fn delete_matched_tasks(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
) -> Result<RoaringBitmap> {
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
@@ -1500,7 +1515,7 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
Ok(to_delete_tasks.len())
|
||||
Ok(to_delete_tasks)
|
||||
}
|
||||
|
||||
/// Cancel each given task from all the databases (if it is cancelable).
|
||||
|
||||
@@ -2244,10 +2244,7 @@ mod tests {
|
||||
.unwrap();
|
||||
index_scheduler.assert_internally_consistent();
|
||||
}
|
||||
for _ in 0..2 {
|
||||
handle.advance_one_successful_batch();
|
||||
index_scheduler.assert_internally_consistent();
|
||||
}
|
||||
handle.advance_one_successful_batch();
|
||||
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
||||
}
|
||||
|
||||
@@ -34,12 +34,10 @@ catto: { number_of_documents: 1, field_distribution: {"id": 1} }
|
||||
[timestamp] [3,]
|
||||
----------------------------------------------------------------------
|
||||
### Started At:
|
||||
[timestamp] [2,]
|
||||
[timestamp] [3,]
|
||||
[timestamp] [2,3,]
|
||||
----------------------------------------------------------------------
|
||||
### Finished At:
|
||||
[timestamp] [2,]
|
||||
[timestamp] [3,]
|
||||
[timestamp] [2,3,]
|
||||
----------------------------------------------------------------------
|
||||
### File Store:
|
||||
00000000-0000-0000-0000-000000000001
|
||||
|
||||
@@ -26,7 +26,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
searchable_fields: &Option<HashSet<FieldId>>,
|
||||
stop_words: Option<&fst::Set<&[u8]>>,
|
||||
stop_words: Option<&fst::Set<Vec<u8>>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
@@ -181,11 +181,11 @@ fn searchable_fields_changed(
|
||||
|
||||
/// Factorize tokenizer building.
|
||||
fn tokenizer_builder<'a>(
|
||||
stop_words: Option<&'a fst::Set<&[u8]>>,
|
||||
stop_words: Option<&'a fst::Set<Vec<u8>>>,
|
||||
allowed_separators: Option<&'a [&str]>,
|
||||
dictionary: Option<&'a [&str]>,
|
||||
script_language: Option<&'a HashMap<Script, Vec<Language>>>,
|
||||
) -> TokenizerBuilder<'a, &'a [u8]> {
|
||||
) -> TokenizerBuilder<'a, Vec<u8>> {
|
||||
let mut tokenizer_builder = TokenizerBuilder::new();
|
||||
if let Some(stop_words) = stop_words {
|
||||
tokenizer_builder.stop_words(stop_words);
|
||||
@@ -211,7 +211,7 @@ fn lang_safe_tokens_from_document<'a>(
|
||||
obkv: &KvReader<FieldId>,
|
||||
searchable_fields: &Option<HashSet<FieldId>>,
|
||||
tokenizer: &Tokenizer,
|
||||
stop_words: Option<&fst::Set<&[u8]>>,
|
||||
stop_words: Option<&fst::Set<Vec<u8>>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: u32,
|
||||
|
||||
@@ -14,7 +14,6 @@ use std::fs::File;
|
||||
use std::io::BufReader;
|
||||
|
||||
use crossbeam_channel::Sender;
|
||||
use log::debug;
|
||||
use rayon::prelude::*;
|
||||
|
||||
use self::extract_docid_word_positions::extract_docid_word_positions;
|
||||
@@ -29,10 +28,7 @@ use self::extract_vector_points::{
|
||||
use self::extract_word_docids::extract_word_docids;
|
||||
use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids;
|
||||
use self::extract_word_position_docids::extract_word_position_docids;
|
||||
use super::helpers::{
|
||||
as_cloneable_grenad, merge_deladd_cbo_roaring_bitmaps, CursorClonableMmap, GrenadParameters,
|
||||
MergeFn, MergeableReader,
|
||||
};
|
||||
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
|
||||
use super::{helpers, TypedChunk};
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
@@ -51,7 +47,7 @@ pub(crate) fn data_from_obkv_documents(
|
||||
primary_key_id: FieldId,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
field_id_map: FieldsIdsMap,
|
||||
stop_words: Option<fst::Set<&[u8]>>,
|
||||
stop_words: Option<fst::Set<Vec<u8>>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
@@ -61,218 +57,170 @@ pub(crate) fn data_from_obkv_documents(
|
||||
) -> Result<()> {
|
||||
puffin::profile_function!();
|
||||
|
||||
original_obkv_chunks
|
||||
.par_bridge()
|
||||
.map(|original_documents_chunk| {
|
||||
send_original_documents_data(
|
||||
original_documents_chunk,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
field_id_map.clone(),
|
||||
embedders.clone(),
|
||||
)
|
||||
})
|
||||
.collect::<Result<()>>()?;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
let result: Result<(Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>)))))> =
|
||||
flattened_obkv_chunks
|
||||
.par_bridge()
|
||||
.map(|flattened_obkv_chunks| {
|
||||
send_and_extract_flattened_documents_data(
|
||||
flattened_obkv_chunks,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
&searchable_fields,
|
||||
&faceted_fields,
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
&stop_words,
|
||||
&allowed_separators,
|
||||
&dictionary,
|
||||
max_positions_per_attributes,
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let (
|
||||
docid_word_positions_chunks,
|
||||
(
|
||||
fid_docid_facet_numbers_chunks,
|
||||
(
|
||||
fid_docid_facet_strings_chunks,
|
||||
(
|
||||
facet_is_null_docids_chunks,
|
||||
(facet_is_empty_docids_chunks, facet_exists_docids_chunks),
|
||||
),
|
||||
),
|
||||
),
|
||||
) = result?;
|
||||
|
||||
// merge facet_exists_docids and send them as a typed chunk
|
||||
{
|
||||
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
debug!("merge {} database", "facet-id-exists-docids");
|
||||
match facet_exists_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
||||
Ok(reader) => {
|
||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetExistsDocids(reader)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// merge facet_is_null_docids and send them as a typed chunk
|
||||
{
|
||||
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
debug!("merge {} database", "facet-id-is-null-docids");
|
||||
match facet_is_null_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
||||
Ok(reader) => {
|
||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsNullDocids(reader)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// merge facet_is_empty_docids and send them as a typed chunk
|
||||
{
|
||||
let lmdb_writer_sx = lmdb_writer_sx.clone();
|
||||
rayon::spawn(move || {
|
||||
debug!("merge {} database", "facet-id-is-empty-docids");
|
||||
match facet_is_empty_docids_chunks.merge(merge_deladd_cbo_roaring_bitmaps, &indexer) {
|
||||
Ok(reader) => {
|
||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsEmptyDocids(reader)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if proximity_precision == ProximityPrecision::ByWord {
|
||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
||||
docid_word_positions_chunks.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_word_pair_proximity_docids,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
TypedChunk::WordPairProximityDocids,
|
||||
"word-pair-proximity-docids",
|
||||
);
|
||||
}
|
||||
|
||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
||||
docid_word_positions_chunks.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_fid_word_count_docids,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
TypedChunk::FieldIdWordCountDocids,
|
||||
"field-id-wordcount-docids",
|
||||
);
|
||||
|
||||
spawn_extraction_task::<
|
||||
_,
|
||||
_,
|
||||
Vec<(
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
)>,
|
||||
>(
|
||||
docid_word_positions_chunks.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
move |doc_word_pos, indexer| extract_word_docids(doc_word_pos, indexer, &exact_attributes),
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
|(word_docids_reader, exact_word_docids_reader, word_fid_docids_reader)| {
|
||||
TypedChunk::WordDocids {
|
||||
word_docids_reader,
|
||||
exact_word_docids_reader,
|
||||
word_fid_docids_reader,
|
||||
}
|
||||
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|
||||
|| {
|
||||
original_obkv_chunks
|
||||
.par_bridge()
|
||||
.map(|original_documents_chunk| {
|
||||
send_original_documents_data(
|
||||
original_documents_chunk,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
field_id_map.clone(),
|
||||
embedders.clone(),
|
||||
)
|
||||
})
|
||||
.collect::<Result<()>>()
|
||||
},
|
||||
|| {
|
||||
flattened_obkv_chunks
|
||||
.par_bridge()
|
||||
.map(|flattened_obkv_chunks| {
|
||||
send_and_extract_flattened_documents_data(
|
||||
flattened_obkv_chunks,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
&searchable_fields,
|
||||
&faceted_fields,
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
&stop_words,
|
||||
&allowed_separators,
|
||||
&dictionary,
|
||||
max_positions_per_attributes,
|
||||
)
|
||||
})
|
||||
.inspect(|result| {
|
||||
if proximity_precision == ProximityPrecision::ByWord {
|
||||
if let Ok((docid_word_positions_chunk, _)) = result {
|
||||
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
|
||||
docid_word_positions_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_word_pair_proximity_docids,
|
||||
TypedChunk::WordPairProximityDocids,
|
||||
"word-pair-proximity-docids",
|
||||
);
|
||||
}
|
||||
}
|
||||
})
|
||||
.inspect(|result| {
|
||||
if let Ok((docid_word_positions_chunk, _)) = result {
|
||||
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
|
||||
docid_word_positions_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_fid_word_count_docids,
|
||||
TypedChunk::FieldIdWordCountDocids,
|
||||
"field-id-wordcount-docids",
|
||||
);
|
||||
}
|
||||
})
|
||||
.inspect(|result| {
|
||||
if let Ok((docid_word_positions_chunk, _)) = result {
|
||||
let exact_attributes = exact_attributes.clone();
|
||||
run_extraction_task::<
|
||||
_,
|
||||
_,
|
||||
(
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
),
|
||||
>(
|
||||
docid_word_positions_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
move |doc_word_pos, indexer| {
|
||||
extract_word_docids(doc_word_pos, indexer, &exact_attributes)
|
||||
},
|
||||
|(
|
||||
word_docids_reader,
|
||||
exact_word_docids_reader,
|
||||
word_fid_docids_reader,
|
||||
)| {
|
||||
TypedChunk::WordDocids {
|
||||
word_docids_reader,
|
||||
exact_word_docids_reader,
|
||||
word_fid_docids_reader,
|
||||
}
|
||||
},
|
||||
"word-docids",
|
||||
);
|
||||
}
|
||||
})
|
||||
.inspect(|result| {
|
||||
if let Ok((docid_word_positions_chunk, _)) = result {
|
||||
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
|
||||
docid_word_positions_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_word_position_docids,
|
||||
TypedChunk::WordPositionDocids,
|
||||
"word-position-docids",
|
||||
);
|
||||
}
|
||||
})
|
||||
.inspect(|result| {
|
||||
if let Ok((_, (_, fid_docid_facet_strings_chunk))) = result {
|
||||
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
|
||||
fid_docid_facet_strings_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_facet_string_docids,
|
||||
TypedChunk::FieldIdFacetStringDocids,
|
||||
"field-id-facet-string-docids",
|
||||
);
|
||||
}
|
||||
})
|
||||
.inspect(|result| {
|
||||
if let Ok((_, (fid_docid_facet_numbers_chunk, _))) = result {
|
||||
run_extraction_task::<_, _, grenad::Reader<BufReader<File>>>(
|
||||
fid_docid_facet_numbers_chunk.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_facet_number_docids,
|
||||
TypedChunk::FieldIdFacetNumberDocids,
|
||||
"field-id-facet-number-docids",
|
||||
);
|
||||
}
|
||||
})
|
||||
.map(|r| r.map(|_| ()))
|
||||
.collect::<Result<()>>()
|
||||
},
|
||||
"word-docids",
|
||||
);
|
||||
|
||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
||||
docid_word_positions_chunks.clone(),
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_word_position_docids,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
TypedChunk::WordPositionDocids,
|
||||
"word-position-docids",
|
||||
);
|
||||
|
||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
||||
fid_docid_facet_strings_chunks,
|
||||
indexer,
|
||||
lmdb_writer_sx.clone(),
|
||||
extract_facet_string_docids,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
TypedChunk::FieldIdFacetStringDocids,
|
||||
"field-id-facet-string-docids",
|
||||
);
|
||||
|
||||
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
|
||||
fid_docid_facet_numbers_chunks,
|
||||
indexer,
|
||||
lmdb_writer_sx,
|
||||
extract_facet_number_docids,
|
||||
merge_deladd_cbo_roaring_bitmaps,
|
||||
TypedChunk::FieldIdFacetNumberDocids,
|
||||
"field-id-facet-number-docids",
|
||||
);
|
||||
|
||||
Ok(())
|
||||
original_pipeline_result.and(flattened_pipeline_result)
|
||||
}
|
||||
|
||||
/// Spawn a new task to extract data for a specific DB using extract_fn.
|
||||
/// Generated grenad chunks are merged using the merge_fn.
|
||||
/// The result of merged chunks is serialized as TypedChunk using the serialize_fn
|
||||
/// and sent into lmdb_writer_sx.
|
||||
fn spawn_extraction_task<FE, FS, M>(
|
||||
chunks: Vec<grenad::Reader<CursorClonableMmap>>,
|
||||
fn run_extraction_task<FE, FS, M>(
|
||||
chunk: grenad::Reader<CursorClonableMmap>,
|
||||
indexer: GrenadParameters,
|
||||
lmdb_writer_sx: Sender<Result<TypedChunk>>,
|
||||
extract_fn: FE,
|
||||
merge_fn: MergeFn,
|
||||
serialize_fn: FS,
|
||||
name: &'static str,
|
||||
) where
|
||||
FE: Fn(grenad::Reader<CursorClonableMmap>, GrenadParameters) -> Result<M::Output>
|
||||
FE: Fn(grenad::Reader<CursorClonableMmap>, GrenadParameters) -> Result<M>
|
||||
+ Sync
|
||||
+ Send
|
||||
+ 'static,
|
||||
FS: Fn(M::Output) -> TypedChunk + Sync + Send + 'static,
|
||||
M: MergeableReader + FromParallelIterator<M::Output> + Send + 'static,
|
||||
M::Output: Send,
|
||||
FS: Fn(M) -> TypedChunk + Sync + Send + 'static,
|
||||
M: Send,
|
||||
{
|
||||
rayon::spawn(move || {
|
||||
puffin::profile_scope!("extract_multiple_chunks", name);
|
||||
let chunks: Result<M> =
|
||||
chunks.into_par_iter().map(|chunk| extract_fn(chunk, indexer)).collect();
|
||||
rayon::spawn(move || match chunks {
|
||||
Ok(chunks) => {
|
||||
debug!("merge {} database", name);
|
||||
puffin::profile_scope!("merge_multiple_chunks", name);
|
||||
let reader = chunks.merge(merge_fn, &indexer);
|
||||
let _ = lmdb_writer_sx.send(reader.map(serialize_fn));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
})
|
||||
});
|
||||
puffin::profile_scope!("extract_chunk", name);
|
||||
match extract_fn(chunk, indexer) {
|
||||
Ok(chunk) => {
|
||||
let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract chunked data and send it into lmdb_writer_sx sender:
|
||||
@@ -350,22 +298,13 @@ fn send_and_extract_flattened_documents_data(
|
||||
faceted_fields: &HashSet<FieldId>,
|
||||
primary_key_id: FieldId,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
stop_words: &Option<fst::Set<&[u8]>>,
|
||||
stop_words: &Option<fst::Set<Vec<u8>>>,
|
||||
allowed_separators: &Option<&[&str]>,
|
||||
dictionary: &Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<(
|
||||
grenad::Reader<CursorClonableMmap>,
|
||||
(
|
||||
grenad::Reader<CursorClonableMmap>,
|
||||
(
|
||||
grenad::Reader<CursorClonableMmap>,
|
||||
(
|
||||
grenad::Reader<BufReader<File>>,
|
||||
(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>),
|
||||
),
|
||||
),
|
||||
),
|
||||
(grenad::Reader<CursorClonableMmap>, grenad::Reader<CursorClonableMmap>),
|
||||
)> {
|
||||
let flattened_documents_chunk =
|
||||
flattened_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
|
||||
@@ -436,16 +375,17 @@ fn send_and_extract_flattened_documents_data(
|
||||
fid_docid_facet_strings_chunk.clone(),
|
||||
)));
|
||||
|
||||
Ok((
|
||||
fid_docid_facet_numbers_chunk,
|
||||
(
|
||||
fid_docid_facet_strings_chunk,
|
||||
(
|
||||
fid_facet_is_null_docids_chunk,
|
||||
(fid_facet_is_empty_docids_chunk, fid_facet_exists_docids_chunk),
|
||||
),
|
||||
),
|
||||
))
|
||||
let _ = lmdb_writer_sx
|
||||
.send(Ok(TypedChunk::FieldIdFacetIsNullDocids(fid_facet_is_null_docids_chunk)));
|
||||
|
||||
let _ = lmdb_writer_sx.send(Ok(TypedChunk::FieldIdFacetIsEmptyDocids(
|
||||
fid_facet_is_empty_docids_chunk,
|
||||
)));
|
||||
|
||||
let _ = lmdb_writer_sx
|
||||
.send(Ok(TypedChunk::FieldIdFacetExistsDocids(fid_facet_exists_docids_chunk)));
|
||||
|
||||
Ok((fid_docid_facet_numbers_chunk, fid_docid_facet_strings_chunk))
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -82,90 +82,6 @@ pub unsafe fn as_cloneable_grenad(
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
pub trait MergeableReader
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
type Output;
|
||||
|
||||
fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result<Self::Output>;
|
||||
}
|
||||
|
||||
impl MergeableReader for Vec<grenad::Reader<BufReader<File>>> {
|
||||
type Output = grenad::Reader<BufReader<File>>;
|
||||
|
||||
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
|
||||
let mut merger = MergerBuilder::new(merge_fn);
|
||||
self.into_iter().try_for_each(|r| merger.push(r))?;
|
||||
merger.finish(params)
|
||||
}
|
||||
}
|
||||
|
||||
impl MergeableReader for Vec<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
|
||||
type Output = (grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>);
|
||||
|
||||
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
|
||||
let mut m1 = MergerBuilder::new(merge_fn);
|
||||
let mut m2 = MergerBuilder::new(merge_fn);
|
||||
for (r1, r2) in self.into_iter() {
|
||||
m1.push(r1)?;
|
||||
m2.push(r2)?;
|
||||
}
|
||||
Ok((m1.finish(params)?, m2.finish(params)?))
|
||||
}
|
||||
}
|
||||
|
||||
impl MergeableReader
|
||||
for Vec<(
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
)>
|
||||
{
|
||||
type Output = (
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
grenad::Reader<BufReader<File>>,
|
||||
);
|
||||
|
||||
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
|
||||
let mut m1 = MergerBuilder::new(merge_fn);
|
||||
let mut m2 = MergerBuilder::new(merge_fn);
|
||||
let mut m3 = MergerBuilder::new(merge_fn);
|
||||
for (r1, r2, r3) in self.into_iter() {
|
||||
m1.push(r1)?;
|
||||
m2.push(r2)?;
|
||||
m3.push(r3)?;
|
||||
}
|
||||
Ok((m1.finish(params)?, m2.finish(params)?, m3.finish(params)?))
|
||||
}
|
||||
}
|
||||
|
||||
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
|
||||
|
||||
impl<R: io::Read + io::Seek> MergerBuilder<R> {
|
||||
fn new(merge_fn: MergeFn) -> Self {
|
||||
Self(grenad::MergerBuilder::new(merge_fn))
|
||||
}
|
||||
|
||||
fn push(&mut self, reader: grenad::Reader<R>) -> Result<()> {
|
||||
self.0.push(reader.into_cursor()?);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn finish(self, params: &GrenadParameters) -> Result<grenad::Reader<BufReader<File>>> {
|
||||
let merger = self.0.build();
|
||||
let mut writer = create_writer(
|
||||
params.chunk_compression_type,
|
||||
params.chunk_compression_level,
|
||||
tempfile::tempfile()?,
|
||||
);
|
||||
merger.write_into_stream_writer(&mut writer)?;
|
||||
|
||||
writer_into_reader(writer)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub struct GrenadParameters {
|
||||
pub chunk_compression_type: CompressionType,
|
||||
|
||||
@@ -10,7 +10,7 @@ use fst::{IntoStreamer, Streamer};
|
||||
pub use grenad_helpers::{
|
||||
as_cloneable_grenad, create_sorter, create_writer, grenad_obkv_into_chunks,
|
||||
merge_ignore_values, sorter_into_reader, write_sorter_into_database, writer_into_reader,
|
||||
GrenadParameters, MergeableReader,
|
||||
GrenadParameters,
|
||||
};
|
||||
pub use merge_functions::{
|
||||
keep_first, keep_latest_obkv, merge_btreeset_string, merge_cbo_roaring_bitmaps,
|
||||
|
||||
@@ -5,12 +5,13 @@ mod transform;
|
||||
mod typed_chunk;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::io::{Cursor, Read, Seek};
|
||||
use std::io::{Read, Seek};
|
||||
use std::iter::FromIterator;
|
||||
use std::num::NonZeroU32;
|
||||
use std::result::Result as StdResult;
|
||||
|
||||
use crossbeam_channel::{Receiver, Sender};
|
||||
use grenad::{Merger, MergerBuilder};
|
||||
use heed::types::Str;
|
||||
use heed::Database;
|
||||
use log::debug;
|
||||
@@ -313,9 +314,6 @@ where
|
||||
}
|
||||
};
|
||||
|
||||
let original_documents = grenad::Reader::new(original_documents)?;
|
||||
let flattened_documents = grenad::Reader::new(flattened_documents)?;
|
||||
|
||||
// create LMDB writer channel
|
||||
let (lmdb_writer_sx, lmdb_writer_rx): (
|
||||
Sender<Result<TypedChunk>>,
|
||||
@@ -354,11 +352,7 @@ where
|
||||
|
||||
let stop_words = self.index.stop_words(self.wtxn)?;
|
||||
let separators = self.index.allowed_separators(self.wtxn)?;
|
||||
let separators: Option<Vec<_>> =
|
||||
separators.as_ref().map(|x| x.iter().map(String::as_str).collect());
|
||||
let dictionary = self.index.dictionary(self.wtxn)?;
|
||||
let dictionary: Option<Vec<_>> =
|
||||
dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect());
|
||||
let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?;
|
||||
let proximity_precision = self.index.proximity_precision(self.wtxn)?.unwrap_or_default();
|
||||
|
||||
@@ -368,55 +362,77 @@ where
|
||||
max_memory: self.indexer_config.max_memory,
|
||||
max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen.
|
||||
};
|
||||
let documents_chunk_size =
|
||||
self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 4); // 4MiB
|
||||
let documents_chunk_size = match self.indexer_config.documents_chunk_size {
|
||||
Some(chunk_size) => chunk_size,
|
||||
None => {
|
||||
let default_chunk_size = 1024 * 1024 * 4; // 4MiB
|
||||
let min_chunk_size = 1024 * 512; // 512KiB
|
||||
|
||||
// compute the chunk size from the number of available threads and the inputed data size.
|
||||
let total_size = flattened_documents.metadata().map(|m| m.len());
|
||||
let current_num_threads = pool.current_num_threads();
|
||||
total_size
|
||||
.map_or(default_chunk_size, |size| (size as usize) / current_num_threads)
|
||||
.max(min_chunk_size)
|
||||
}
|
||||
};
|
||||
|
||||
let original_documents = grenad::Reader::new(original_documents)?;
|
||||
let flattened_documents = grenad::Reader::new(flattened_documents)?;
|
||||
|
||||
let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes;
|
||||
|
||||
let cloned_embedder = self.embedders.clone();
|
||||
|
||||
// Run extraction pipeline in parallel.
|
||||
pool.install(|| {
|
||||
puffin::profile_scope!("extract_and_send_grenad_chunks");
|
||||
// split obkv file into several chunks
|
||||
let original_chunk_iter =
|
||||
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
|
||||
let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap());
|
||||
rayon::spawn(move || {
|
||||
puffin::profile_scope!("extract_and_send_grenad_chunks");
|
||||
// split obkv file into several chunks
|
||||
let original_chunk_iter =
|
||||
grenad_obkv_into_chunks(original_documents, pool_params, documents_chunk_size);
|
||||
|
||||
// split obkv file into several chunks
|
||||
let flattened_chunk_iter =
|
||||
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
|
||||
// split obkv file into several chunks
|
||||
let flattened_chunk_iter =
|
||||
grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size);
|
||||
|
||||
let result = original_chunk_iter.and_then(|original_chunk| {
|
||||
let flattened_chunk = flattened_chunk_iter?;
|
||||
// extract all databases from the chunked obkv douments
|
||||
extract::data_from_obkv_documents(
|
||||
original_chunk,
|
||||
flattened_chunk,
|
||||
pool_params,
|
||||
lmdb_writer_sx.clone(),
|
||||
searchable_fields,
|
||||
faceted_fields,
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
field_id_map,
|
||||
stop_words,
|
||||
separators.as_deref(),
|
||||
dictionary.as_deref(),
|
||||
max_positions_per_attributes,
|
||||
exact_attributes,
|
||||
proximity_precision,
|
||||
cloned_embedder,
|
||||
)
|
||||
let separators: Option<Vec<_>> =
|
||||
separators.as_ref().map(|x| x.iter().map(String::as_str).collect());
|
||||
let dictionary: Option<Vec<_>> =
|
||||
dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect());
|
||||
let result = original_chunk_iter.and_then(|original_chunk| {
|
||||
let flattened_chunk = flattened_chunk_iter?;
|
||||
// extract all databases from the chunked obkv douments
|
||||
extract::data_from_obkv_documents(
|
||||
original_chunk,
|
||||
flattened_chunk,
|
||||
pool_params,
|
||||
lmdb_writer_sx.clone(),
|
||||
searchable_fields,
|
||||
faceted_fields,
|
||||
primary_key_id,
|
||||
geo_fields_ids,
|
||||
field_id_map,
|
||||
stop_words,
|
||||
separators.as_deref(),
|
||||
dictionary.as_deref(),
|
||||
max_positions_per_attributes,
|
||||
exact_attributes,
|
||||
proximity_precision,
|
||||
cloned_embedder,
|
||||
)
|
||||
});
|
||||
|
||||
if let Err(e) = result {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
|
||||
// needs to be dropped to avoid channel waiting lock.
|
||||
drop(lmdb_writer_sx);
|
||||
});
|
||||
|
||||
if let Err(e) = result {
|
||||
let _ = lmdb_writer_sx.send(Err(e));
|
||||
}
|
||||
|
||||
// needs to be dropped to avoid channel waiting lock.
|
||||
drop(lmdb_writer_sx);
|
||||
});
|
||||
|
||||
let index_is_empty = self.index.number_of_documents(self.wtxn)? == 0;
|
||||
let mut final_documents_ids = RoaringBitmap::new();
|
||||
|
||||
let mut databases_seen = 0;
|
||||
@@ -444,12 +460,21 @@ where
|
||||
word_fid_docids_reader,
|
||||
} => {
|
||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
|
||||
word_docids = Some(cloneable_chunk);
|
||||
let word_docids = word_docids.get_or_insert_with(|| {
|
||||
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
|
||||
});
|
||||
word_docids.push(cloneable_chunk.into_cursor()?);
|
||||
let cloneable_chunk =
|
||||
unsafe { as_cloneable_grenad(&exact_word_docids_reader)? };
|
||||
exact_word_docids = Some(cloneable_chunk);
|
||||
let exact_word_docids = exact_word_docids.get_or_insert_with(|| {
|
||||
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
|
||||
});
|
||||
exact_word_docids.push(cloneable_chunk.into_cursor()?);
|
||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_fid_docids_reader)? };
|
||||
word_fid_docids = Some(cloneable_chunk);
|
||||
let word_fid_docids = word_fid_docids.get_or_insert_with(|| {
|
||||
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
|
||||
});
|
||||
word_fid_docids.push(cloneable_chunk.into_cursor()?);
|
||||
TypedChunk::WordDocids {
|
||||
word_docids_reader,
|
||||
exact_word_docids_reader,
|
||||
@@ -458,7 +483,10 @@ where
|
||||
}
|
||||
TypedChunk::WordPositionDocids(chunk) => {
|
||||
let cloneable_chunk = unsafe { as_cloneable_grenad(&chunk)? };
|
||||
word_position_docids = Some(cloneable_chunk);
|
||||
let word_position_docids = word_position_docids.get_or_insert_with(|| {
|
||||
MergerBuilder::new(merge_deladd_cbo_roaring_bitmaps as MergeFn)
|
||||
});
|
||||
word_position_docids.push(cloneable_chunk.into_cursor()?);
|
||||
TypedChunk::WordPositionDocids(chunk)
|
||||
}
|
||||
TypedChunk::VectorPoints {
|
||||
@@ -481,7 +509,7 @@ where
|
||||
};
|
||||
|
||||
let (docids, is_merged_database) =
|
||||
write_typed_chunk_into_index(typed_chunk, self.index, self.wtxn, index_is_empty)?;
|
||||
write_typed_chunk_into_index(typed_chunk, self.index, self.wtxn)?;
|
||||
if !docids.is_empty() {
|
||||
final_documents_ids |= docids;
|
||||
let documents_seen_count = final_documents_ids.len();
|
||||
@@ -538,10 +566,10 @@ where
|
||||
}
|
||||
|
||||
self.execute_prefix_databases(
|
||||
word_docids,
|
||||
exact_word_docids,
|
||||
word_position_docids,
|
||||
word_fid_docids,
|
||||
word_docids.map(MergerBuilder::build),
|
||||
exact_word_docids.map(MergerBuilder::build),
|
||||
word_position_docids.map(MergerBuilder::build),
|
||||
word_fid_docids.map(MergerBuilder::build),
|
||||
)?;
|
||||
|
||||
Ok(number_of_documents)
|
||||
@@ -550,10 +578,10 @@ where
|
||||
#[logging_timer::time("IndexDocuments::{}")]
|
||||
pub fn execute_prefix_databases(
|
||||
self,
|
||||
word_docids: Option<grenad::Reader<CursorClonableMmap>>,
|
||||
exact_word_docids: Option<grenad::Reader<CursorClonableMmap>>,
|
||||
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
|
||||
word_fid_docids: Option<grenad::Reader<CursorClonableMmap>>,
|
||||
word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
|
||||
exact_word_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
|
||||
word_position_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
|
||||
word_fid_docids: Option<Merger<CursorClonableMmap, MergeFn>>,
|
||||
) -> Result<()>
|
||||
where
|
||||
FP: Fn(UpdateIndexingStep) + Sync,
|
||||
@@ -728,7 +756,7 @@ where
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn execute_word_prefix_docids(
|
||||
txn: &mut heed::RwTxn,
|
||||
reader: grenad::Reader<Cursor<ClonableMmap>>,
|
||||
merger: Merger<CursorClonableMmap, MergeFn>,
|
||||
word_docids_db: Database<Str, CboRoaringBitmapCodec>,
|
||||
word_prefix_docids_db: Database<Str, CboRoaringBitmapCodec>,
|
||||
indexer_config: &IndexerConfig,
|
||||
@@ -738,13 +766,12 @@ fn execute_word_prefix_docids(
|
||||
) -> Result<()> {
|
||||
puffin::profile_function!();
|
||||
|
||||
let cursor = reader.into_cursor()?;
|
||||
let mut builder = WordPrefixDocids::new(txn, word_docids_db, word_prefix_docids_db);
|
||||
builder.chunk_compression_type = indexer_config.chunk_compression_type;
|
||||
builder.chunk_compression_level = indexer_config.chunk_compression_level;
|
||||
builder.max_nb_chunks = indexer_config.max_nb_chunks;
|
||||
builder.max_memory = indexer_config.max_memory;
|
||||
builder.execute(cursor, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?;
|
||||
builder.execute(merger, new_prefix_fst_words, common_prefix_fst_words, del_prefix_fst_words)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec;
|
||||
use charabia::{Language, Script};
|
||||
use grenad::MergerBuilder;
|
||||
use heed::types::Bytes;
|
||||
use heed::{PutFlags, RwTxn};
|
||||
use heed::RwTxn;
|
||||
use obkv::{KvReader, KvWriter};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
@@ -119,7 +119,6 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
typed_chunk: TypedChunk,
|
||||
index: &Index,
|
||||
wtxn: &mut RwTxn,
|
||||
index_is_empty: bool,
|
||||
) -> Result<(RoaringBitmap, bool)> {
|
||||
puffin::profile_function!(typed_chunk.to_debug_string());
|
||||
|
||||
@@ -172,11 +171,10 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
index.put_documents_ids(wtxn, &docids)?;
|
||||
}
|
||||
TypedChunk::FieldIdWordCountDocids(fid_word_count_docids_iter) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
fid_word_count_docids_iter,
|
||||
&index.field_id_word_count_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
@@ -188,31 +186,28 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
word_fid_docids_reader,
|
||||
} => {
|
||||
let word_docids_iter = unsafe { as_cloneable_grenad(&word_docids_reader) }?;
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
word_docids_iter.clone(),
|
||||
&index.word_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
|
||||
let exact_word_docids_iter = unsafe { as_cloneable_grenad(&exact_word_docids_reader) }?;
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
exact_word_docids_iter.clone(),
|
||||
&index.exact_word_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
|
||||
let word_fid_docids_iter = unsafe { as_cloneable_grenad(&word_fid_docids_reader) }?;
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
word_fid_docids_iter,
|
||||
&index.word_fid_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
@@ -230,11 +225,10 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
is_merged_database = true;
|
||||
}
|
||||
TypedChunk::WordPositionDocids(word_position_docids_iter) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
word_position_docids_iter,
|
||||
&index.word_position_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
@@ -251,44 +245,40 @@ pub(crate) fn write_typed_chunk_into_index(
|
||||
is_merged_database = true;
|
||||
}
|
||||
TypedChunk::FieldIdFacetExistsDocids(facet_id_exists_docids) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
facet_id_exists_docids,
|
||||
&index.facet_id_exists_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
is_merged_database = true;
|
||||
}
|
||||
TypedChunk::FieldIdFacetIsNullDocids(facet_id_is_null_docids) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
facet_id_is_null_docids,
|
||||
&index.facet_id_is_null_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
is_merged_database = true;
|
||||
}
|
||||
TypedChunk::FieldIdFacetIsEmptyDocids(facet_id_is_empty_docids) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
facet_id_is_empty_docids,
|
||||
&index.facet_id_is_empty_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
is_merged_database = true;
|
||||
}
|
||||
TypedChunk::WordPairProximityDocids(word_pair_proximity_docids_iter) => {
|
||||
append_entries_into_database(
|
||||
write_entries_into_database(
|
||||
word_pair_proximity_docids_iter,
|
||||
&index.word_pair_proximity_docids,
|
||||
wtxn,
|
||||
index_is_empty,
|
||||
deladd_serialize_add_side,
|
||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||
)?;
|
||||
@@ -541,7 +531,6 @@ fn write_entries_into_database<R, K, V, FS, FM>(
|
||||
data: grenad::Reader<R>,
|
||||
database: &heed::Database<K, V>,
|
||||
wtxn: &mut RwTxn,
|
||||
index_is_empty: bool,
|
||||
serialize_value: FS,
|
||||
merge_values: FM,
|
||||
) -> Result<()>
|
||||
@@ -559,13 +548,9 @@ where
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
if valid_lmdb_key(key) {
|
||||
buffer.clear();
|
||||
let value = if index_is_empty {
|
||||
Some(serialize_value(value, &mut buffer)?)
|
||||
} else {
|
||||
match database.get(wtxn, key)? {
|
||||
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
|
||||
None => Some(serialize_value(value, &mut buffer)?),
|
||||
}
|
||||
let value = match database.get(wtxn, key)? {
|
||||
Some(prev_value) => merge_values(value, prev_value, &mut buffer)?,
|
||||
None => Some(serialize_value(value, &mut buffer)?),
|
||||
};
|
||||
match value {
|
||||
Some(value) => database.put(wtxn, key, value)?,
|
||||
@@ -578,58 +563,3 @@ where
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write provided entries in database using serialize_value function.
|
||||
/// merge_values function is used if an entry already exist in the database.
|
||||
/// All provided entries must be ordered.
|
||||
/// If the index is not empty, write_entries_into_database is called instead.
|
||||
fn append_entries_into_database<R, K, V, FS, FM>(
|
||||
data: grenad::Reader<R>,
|
||||
database: &heed::Database<K, V>,
|
||||
wtxn: &mut RwTxn,
|
||||
index_is_empty: bool,
|
||||
serialize_value: FS,
|
||||
merge_values: FM,
|
||||
) -> Result<()>
|
||||
where
|
||||
R: io::Read + io::Seek,
|
||||
FS: for<'a> Fn(&'a [u8], &'a mut Vec<u8>) -> Result<&'a [u8]>,
|
||||
FM: for<'a> Fn(&[u8], &[u8], &'a mut Vec<u8>) -> Result<Option<&'a [u8]>>,
|
||||
K: for<'a> heed::BytesDecode<'a>,
|
||||
{
|
||||
puffin::profile_function!(format!("number of entries: {}", data.len()));
|
||||
|
||||
if !index_is_empty {
|
||||
return write_entries_into_database(
|
||||
data,
|
||||
database,
|
||||
wtxn,
|
||||
false,
|
||||
serialize_value,
|
||||
merge_values,
|
||||
);
|
||||
}
|
||||
|
||||
let mut buffer = Vec::new();
|
||||
let mut database = database.iter_mut(wtxn)?.remap_types::<Bytes, Bytes>();
|
||||
|
||||
let mut cursor = data.into_cursor()?;
|
||||
while let Some((key, value)) = cursor.move_on_next()? {
|
||||
if valid_lmdb_key(key) {
|
||||
debug_assert!(
|
||||
K::bytes_decode(key).is_ok(),
|
||||
"Couldn't decode key with the database decoder, key length: {} - key bytes: {:x?}",
|
||||
key.len(),
|
||||
&key
|
||||
);
|
||||
buffer.clear();
|
||||
let value = serialize_value(value, &mut buffer)?;
|
||||
unsafe {
|
||||
// safety: We do not keep a reference to anything that lives inside the database
|
||||
database.put_current_with_options::<Bytes>(PutFlags::APPEND, key, value)?
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
#[logging_timer::time("WordPrefixDocids::{}")]
|
||||
pub fn execute(
|
||||
self,
|
||||
mut new_word_docids_iter: grenad::ReaderCursor<CursorClonableMmap>,
|
||||
new_word_docids: grenad::Merger<CursorClonableMmap, MergeFn>,
|
||||
new_prefix_fst_words: &[String],
|
||||
common_prefix_fst_words: &[&[String]],
|
||||
del_prefix_fst_words: &HashSet<Vec<u8>>,
|
||||
@@ -63,7 +63,8 @@ impl<'t, 'i> WordPrefixDocids<'t, 'i> {
|
||||
if !common_prefix_fst_words.is_empty() {
|
||||
let mut current_prefixes: Option<&&[String]> = None;
|
||||
let mut prefixes_cache = HashMap::new();
|
||||
while let Some((word, data)) = new_word_docids_iter.move_on_next()? {
|
||||
let mut new_word_docids_iter = new_word_docids.into_stream_merger_iter()?;
|
||||
while let Some((word, data)) = new_word_docids_iter.next()? {
|
||||
current_prefixes = match current_prefixes.take() {
|
||||
Some(prefixes) if word.starts_with(prefixes[0].as_bytes()) => Some(prefixes),
|
||||
_otherwise => {
|
||||
|
||||
@@ -47,7 +47,7 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
#[logging_timer::time("WordPrefixIntegerDocids::{}")]
|
||||
pub fn execute(
|
||||
self,
|
||||
new_word_integer_docids: grenad::Reader<CursorClonableMmap>,
|
||||
new_word_integer_docids: grenad::Merger<CursorClonableMmap, MergeFn>,
|
||||
new_prefix_fst_words: &[String],
|
||||
common_prefix_fst_words: &[&[String]],
|
||||
del_prefix_fst_words: &HashSet<Vec<u8>>,
|
||||
@@ -64,14 +64,14 @@ impl<'t, 'i> WordPrefixIntegerDocids<'t, 'i> {
|
||||
self.max_memory,
|
||||
);
|
||||
|
||||
let mut new_word_integer_docids_iter = new_word_integer_docids.into_cursor()?;
|
||||
|
||||
if !common_prefix_fst_words.is_empty() {
|
||||
// We fetch all the new common prefixes between the previous and new prefix fst.
|
||||
let mut buffer = Vec::new();
|
||||
let mut current_prefixes: Option<&&[String]> = None;
|
||||
let mut prefixes_cache = HashMap::new();
|
||||
while let Some((key, data)) = new_word_integer_docids_iter.move_on_next()? {
|
||||
let mut new_word_integer_docids_iter =
|
||||
new_word_integer_docids.into_stream_merger_iter()?;
|
||||
while let Some((key, data)) = new_word_integer_docids_iter.next()? {
|
||||
let (word, pos) =
|
||||
StrBEU16Codec::bytes_decode(key).map_err(heed::Error::Decoding)?;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user