Compare commits

..

1 Commits

Author SHA1 Message Date
Tamo
fd93a704a6 use a bufreader everytime there is a grenad<file> 2023-09-28 16:26:01 +02:00
35 changed files with 564 additions and 674 deletions

View File

@@ -74,4 +74,4 @@ jobs:
echo "${{ steps.file.outputs.basename }}.json has just been pushed."
echo 'How to compare this benchmark with another one?'
echo ' - Check the available files with: ./benchmarks/scripts/list.sh'
echo " - Run the following command: ./benchmaks/scripts/compare.sh <file-to-compare-with> ${{ steps.file.outputs.basename }}.json"
echo " - Run the following command: ./benchmaks/scipts/compare.sh <file-to-compare-with> ${{ steps.file.outputs.basename }}.json"

View File

@@ -57,10 +57,10 @@ jobs:
echo "date=$commit_date" >> $GITHUB_OUTPUT
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@v2
- name: Login to Docker Hub
uses: docker/login-action@v2
@@ -70,7 +70,7 @@ jobs:
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
uses: docker/metadata-action@v4
with:
images: getmeili/meilisearch
# Prevent `latest` to be updated for each new tag pushed.
@@ -83,7 +83,7 @@ jobs:
type=raw,value=latest,enable=${{ steps.check-tag-format.outputs.stable == 'true' && steps.check-tag-format.outputs.latest == 'true' }}
- name: Build and push
uses: docker/build-push-action@v5
uses: docker/build-push-action@v4
with:
push: true
platforms: linux/amd64,linux/arm64

View File

@@ -1,81 +0,0 @@
name: Benchmarks (PR)
on: issue_comment
permissions:
issues: write
env:
GH_TOKEN: ${{ secrets.MEILI_BOT_GH_PAT }}
jobs:
run-benchmarks-on-comment:
name: Run and upload benchmarks
runs-on: benchmarks
timeout-minutes: 4320 # 72h
steps:
- uses: actions/checkout@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: stable
override: true
- name: Check for Command
id: command
uses: xt0rted/slash-command-action@v2
with:
command: benchmark
reaction-type: "eyes"
# Set variables
- name: Set current branch name
shell: bash
run: echo "name=$(echo ${GITHUB_REF#refs/heads/})" >> $GITHUB_OUTPUT
id: current_branch
- name: Set normalized current branch name # Replace `/` by `_` in branch name to avoid issues when pushing to S3
shell: bash
run: echo "name=$(echo ${GITHUB_REF#refs/heads/} | tr '/' '_')" >> $GITHUB_OUTPUT
id: normalized_current_branch
- name: Set shorter commit SHA
shell: bash
run: echo "short=$(echo $GITHUB_SHA | cut -c1-8)" >> $GITHUB_OUTPUT
id: commit_sha
- name: Set file basename with format "dataset_branch_commitSHA"
shell: bash
run: echo "basename=$(echo ${{ steps.command.outputs.command-arguments }}_${{ steps.normalized_current_branch.outputs.name }}_${{ steps.commit_sha.outputs.short }})" >> $GITHUB_OUTPUT
id: file
# Run benchmarks
- name: Run benchmarks - Dataset ${{ steps.command.outputs.command-arguments }} - Branch ${{ steps.current_branch.outputs.name }} - Commit ${{ steps.commit_sha.outputs.short }}
run: |
cd benchmarks
cargo bench --bench ${{ steps.command.outputs.command-arguments }} -- --save-baseline ${{ steps.file.outputs.basename }}
# Generate critcmp files
- name: Install critcmp
uses: taiki-e/install-action@v2
with:
tool: critcmp
- name: Export cripcmp file
run: |
critcmp --export ${{ steps.file.outputs.basename }} > ${{ steps.file.outputs.basename }}.json
# Upload benchmarks
- name: Upload ${{ steps.file.outputs.basename }}.json to DO Spaces # DigitalOcean Spaces = S3
uses: BetaHuhn/do-spaces-action@v2
with:
access_key: ${{ secrets.DO_SPACES_ACCESS_KEY }}
secret_key: ${{ secrets.DO_SPACES_SECRET_KEY }}
space_name: ${{ secrets.DO_SPACES_SPACE_NAME }}
space_region: ${{ secrets.DO_SPACES_SPACE_REGION }}
source: ${{ steps.file.outputs.basename }}.json
out_dir: critcmp_results
# Compute the diff of the benchmarks and send a message on the GitHub PR
- name: Compute and send a message in the PR
run: |
export base=git rev-parse $(git cherry main | head -n 1 | cut -c 3-)~ | cut -c -8
echo 'Here are your benchmarks diff 👊' >> body.txt
echo '```' >> body.txt
./benchmaks/scipts/compare.sh $base ${{ steps.file.outputs.basename }}.json >> body.txt
echo '```' >> body.txt
gh pr comment ${GITHUB_REF#refs/heads/} --body-file body.txt

898
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,7 @@ serde_json = { version = "1.0.95", features = ["preserve_order"] }
criterion = { version = "0.5.1", features = ["html_reports"] }
rand = "0.8.5"
rand_chacha = "0.3.1"
roaring = { path = "../../roaring-rs" }
roaring = "0.10.1"
[build-dependencies]
anyhow = "1.0.70"

View File

@@ -19,7 +19,7 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
once_cell = "1.17.1"
regex = "1.7.3"
roaring = { path = "../../roaring-rs", features = ["serde"] }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }
tar = "0.4.38"

View File

@@ -23,7 +23,7 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0"
puffin = "0.16.0"
roaring = { path = "../../roaring-rs", features = ["serde"] }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }
synchronoise = "1.0.1"

View File

@@ -17,7 +17,7 @@ hmac = "0.12.1"
maplit = "1.0.2"
meilisearch-types = { path = "../meilisearch-types" }
rand = "0.8.5"
roaring = { path = "../../roaring-rs", features = ["serde"] }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }
sha2 = "0.10.6"

View File

@@ -23,7 +23,7 @@ flate2 = "1.0.25"
fst = "0.4.7"
memmap2 = "0.7.1"
milli = { path = "../milli" }
roaring = { path = "../../roaring-rs", features = ["serde"] }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde-cs = "0.2.4"
serde_json = "1.0.95"

View File

@@ -42,7 +42,7 @@ once_cell = "1.17.1"
ordered-float = "3.6.0"
rand_pcg = { version = "0.3.1", features = ["serde1"] }
rayon = "1.7.0"
roaring = { path = "../../roaring-rs" }
roaring = "0.10.1"
rstar = { version = "0.11.0", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }

View File

@@ -1,4 +1,5 @@
use std::fs::File;
use std::io::BufReader;
use std::{io, str};
use obkv::KvReader;
@@ -19,14 +20,14 @@ use crate::FieldId;
pub struct EnrichedDocumentsBatchReader<R> {
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<File>,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R: io::Read + io::Seek> EnrichedDocumentsBatchReader<R> {
pub fn new(
documents: DocumentsBatchReader<R>,
primary_key: String,
external_ids: grenad::Reader<File>,
external_ids: grenad::Reader<BufReader<File>>,
) -> Result<Self, Error> {
if documents.documents_count() as u64 == external_ids.len() {
Ok(EnrichedDocumentsBatchReader {
@@ -75,7 +76,7 @@ pub struct EnrichedDocument<'a> {
pub struct EnrichedDocumentsBatchCursor<R> {
documents: DocumentsBatchCursor<R>,
primary_key: String,
external_ids: grenad::ReaderCursor<File>,
external_ids: grenad::ReaderCursor<BufReader<File>>,
}
impl<R> EnrichedDocumentsBatchCursor<R> {

View File

@@ -47,6 +47,8 @@ pub enum InternalError {
IndexingMergingKeys { process: &'static str },
#[error("{}", HeedError::InvalidDatabaseTyping)]
InvalidDatabaseTyping,
#[error("Could not access the inner of a buf-reader/writer: {0}")]
BufIntoInnerError(String),
#[error(transparent)]
RayonThreadPool(#[from] ThreadPoolBuildError),
#[error(transparent)]

View File

@@ -1,5 +1,4 @@
use std::borrow::Cow;
use std::convert::TryInto;
use std::io;
use std::mem::size_of;
@@ -57,30 +56,22 @@ impl CboRoaringBitmapCodec {
}
/// Merge serialized CboRoaringBitmaps in a buffer.
/// The buffer must be empty before calling the function.
///
/// if the merged values length is under the threshold, values are directly
/// serialized in the buffer else a RoaringBitmap is created from the
/// values and is serialized in the buffer.
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
debug_assert!(buffer.is_empty());
let mut roaring = RoaringBitmap::new();
let mut vec = Vec::new();
for bytes in slices {
if bytes.len() <= THRESHOLD * size_of::<u32>() {
debug_assert!(bytes.len() % size_of::<u32>() == 0);
vec.reserve(bytes.len() / size_of::<u32>());
for bytes in bytes.chunks_exact(size_of::<u32>()) {
// unwrap can't happens since we ensured that everything
// was a multiple of size_of<u32>.
let v = u32::from_ne_bytes(bytes.try_into().unwrap());
vec.push(v);
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer);
}
} else {
roaring.union_with_serialized_unchecked(bytes.as_ref())?;
roaring |= RoaringBitmap::deserialize_unchecked_from(bytes.as_ref())?;
}
}
@@ -94,7 +85,7 @@ impl CboRoaringBitmapCodec {
}
} else {
// We can unwrap safely because the vector is sorted upper.
let roaring = RoaringBitmap::from_sorted_iter(vec).unwrap();
let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter()).unwrap();
roaring.serialize_into(buffer)?;
}
} else {
@@ -195,11 +186,8 @@ mod tests {
let medium_data: Vec<_> =
medium_data.iter().map(|b| CboRoaringBitmapCodec::bytes_encode(b).unwrap()).collect();
// TODO: used for profiling purpose, get rids of it once the function is optimized
for _ in 0..100000 {
buffer.clear();
CboRoaringBitmapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap();
}
buffer.clear();
CboRoaringBitmapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap();
let bitmap = CboRoaringBitmapCodec::deserialize_from(&buffer).unwrap();
let expected = RoaringBitmap::from_sorted_iter(0..23).unwrap();

View File

@@ -1,6 +1,5 @@
#![cfg_attr(all(test, fuzzing), feature(no_coverage))]
#![allow(clippy::type_complexity)]
#![feature(test)]
#[cfg(test)]
#[global_allocator]

View File

@@ -351,5 +351,5 @@ fn test_redacted() {
.map(|scores| score_details::ScoreDetails::to_json_map(scores.iter()))
.collect();
insta::assert_snapshot!(format!("{documents_ids:?}"), @"[0, 2, 4, 5, 22, 23, 13, 1, 3, 12, 21, 11, 20, 6, 7, 8, 9, 10, 14, 15]");
// insta::assert_json_snapshot!(document_scores_json);
insta::assert_json_snapshot!(document_scores_json);
}

View File

@@ -1,5 +1,6 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::BufReader;
use grenad::CompressionType;
use heed::types::ByteSlice;
@@ -30,7 +31,7 @@ pub struct FacetsUpdateBulk<'i> {
facet_type: FacetType,
field_ids: Vec<FieldId>,
// None if level 0 does not need to be updated
new_data: Option<grenad::Reader<File>>,
new_data: Option<grenad::Reader<BufReader<File>>>,
}
impl<'i> FacetsUpdateBulk<'i> {
@@ -38,7 +39,7 @@ impl<'i> FacetsUpdateBulk<'i> {
index: &'i Index,
field_ids: Vec<FieldId>,
facet_type: FacetType,
new_data: grenad::Reader<File>,
new_data: grenad::Reader<BufReader<File>>,
group_size: u8,
min_level_size: u8,
) -> FacetsUpdateBulk<'i> {
@@ -187,7 +188,7 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
&self,
field_id: FieldId,
txn: &RoTxn,
) -> Result<(Vec<grenad::Reader<File>>, RoaringBitmap)> {
) -> Result<(Vec<grenad::Reader<BufReader<File>>>, RoaringBitmap)> {
let mut all_docids = RoaringBitmap::new();
let subwriters = self.compute_higher_levels(txn, field_id, 32, &mut |bitmaps, _| {
for bitmap in bitmaps {
@@ -259,7 +260,7 @@ impl<R: std::io::Read + std::io::Seek> FacetsUpdateBulkInner<R> {
field_id: u16,
level: u8,
handle_group: &mut dyn FnMut(&[RoaringBitmap], &'t [u8]) -> Result<()>,
) -> Result<Vec<grenad::Reader<File>>> {
) -> Result<Vec<grenad::Reader<BufReader<File>>>> {
if level == 0 {
self.read_level_0(rtxn, field_id, handle_group)?;
// Level 0 is already in the database

View File

@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use heed::types::{ByteSlice, DecodeIgnore};
use heed::{BytesDecode, Error, RoTxn, RwTxn};
@@ -34,14 +35,14 @@ pub struct FacetsUpdateIncremental<'i> {
index: &'i Index,
inner: FacetsUpdateIncrementalInner,
facet_type: FacetType,
new_data: grenad::Reader<File>,
new_data: grenad::Reader<BufReader<File>>,
}
impl<'i> FacetsUpdateIncremental<'i> {
pub fn new(
index: &'i Index,
facet_type: FacetType,
new_data: grenad::Reader<File>,
new_data: grenad::Reader<BufReader<File>>,
group_size: u8,
min_level_size: u8,
max_group_size: u8,

View File

@@ -78,6 +78,7 @@ pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::iter::FromIterator;
use charabia::normalizer::{Normalize, NormalizerOption};
@@ -108,13 +109,17 @@ pub struct FacetsUpdate<'i> {
index: &'i Index,
database: heed::Database<FacetGroupKeyCodec<ByteSliceRefCodec>, FacetGroupValueCodec>,
facet_type: FacetType,
new_data: grenad::Reader<File>,
new_data: grenad::Reader<BufReader<File>>,
group_size: u8,
max_group_size: u8,
min_level_size: u8,
}
impl<'i> FacetsUpdate<'i> {
pub fn new(index: &'i Index, facet_type: FacetType, new_data: grenad::Reader<File>) -> Self {
pub fn new(
index: &'i Index,
facet_type: FacetType,
new_data: grenad::Reader<BufReader<File>>,
) -> Self {
let database = match facet_type {
FacetType::String => index
.facet_id_string_docids

View File

@@ -1,4 +1,4 @@
use std::io::{Read, Seek};
use std::io::{BufWriter, Read, Seek};
use std::result::Result as StdResult;
use std::{fmt, iter};
@@ -35,7 +35,7 @@ pub fn enrich_documents_batch<R: Read + Seek>(
let (mut cursor, mut documents_batch_index) = reader.into_cursor_and_fields_index();
let mut external_ids = tempfile::tempfile().map(grenad::Writer::new)?;
let mut external_ids = tempfile::tempfile().map(BufWriter::new).map(grenad::Writer::new)?;
let mut uuid_buffer = [0; uuid::fmt::Hyphenated::LENGTH];
// The primary key *field id* that has already been set for this index or the one

View File

@@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io::BufReader;
use std::{io, mem, str};
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
@@ -31,7 +32,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
allowed_separators: Option<&[&str]>,
dictionary: Option<&[&str]>,
max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
) -> Result<(RoaringBitmap, grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes

View File

@@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use heed::{BytesDecode, BytesEncode};
@@ -19,7 +19,7 @@ use crate::Result;
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use heed::BytesEncode;
@@ -17,7 +17,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use std::mem::size_of;
use heed::zerocopy::AsBytes;
@@ -17,11 +17,11 @@ use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET
/// The extracted facet values stored in grenad files by type.
pub struct ExtractedFacetValues {
pub docid_fid_facet_numbers_chunk: grenad::Reader<File>,
pub docid_fid_facet_strings_chunk: grenad::Reader<File>,
pub fid_facet_is_null_docids_chunk: grenad::Reader<File>,
pub fid_facet_is_empty_docids_chunk: grenad::Reader<File>,
pub fid_facet_exists_docids_chunk: grenad::Reader<File>,
pub docid_fid_facet_numbers_chunk: grenad::Reader<BufReader<File>>,
pub docid_fid_facet_strings_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_is_null_docids_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_is_empty_docids_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_exists_docids_chunk: grenad::Reader<BufReader<File>>,
}
/// Extracts the facet values of each faceted field of each document.

View File

@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use grenad::Sorter;
@@ -21,7 +21,7 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result};
pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use concat_arrays::concat_arrays;
use serde_json::Value;
@@ -18,7 +18,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
indexer: GrenadParameters,
primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(

View File

@@ -1,6 +1,6 @@
use std::convert::TryFrom;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use bytemuck::cast_slice;
use serde_json::{from_slice, Value};
@@ -18,7 +18,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
indexer: GrenadParameters,
primary_key_id: FieldId,
vectors_fid: FieldId,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(

View File

@@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use std::iter::FromIterator;
use roaring::RoaringBitmap;
@@ -26,7 +26,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
@@ -14,7 +14,7 @@ use crate::{relative_from_absolute_position, DocumentId, Result};
pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::fs::File;
use std::io::BufReader;
use std::{cmp, io, mem, str, vec};
use super::helpers::{
@@ -20,7 +21,7 @@ use crate::{DocumentId, Result};
pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
@@ -17,7 +17,7 @@ use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Resu
pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@@ -12,6 +12,7 @@ mod extract_word_position_docids;
use std::collections::HashSet;
use std::fs::File;
use std::io::BufReader;
use crossbeam_channel::Sender;
use log::debug;
@@ -39,8 +40,8 @@ use crate::{FieldId, Result};
/// Send data in grenad file over provided Sender.
#[allow(clippy::too_many_arguments)]
pub(crate) fn data_from_obkv_documents(
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: Option<HashSet<FieldId>>,
@@ -152,7 +153,7 @@ pub(crate) fn data_from_obkv_documents(
});
}
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@@ -162,7 +163,7 @@ pub(crate) fn data_from_obkv_documents(
"word-pair-proximity-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@@ -172,7 +173,11 @@ pub(crate) fn data_from_obkv_documents(
"field-id-wordcount-docids",
);
spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>(
spawn_extraction_task::<
_,
_,
Vec<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)>,
>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@@ -185,7 +190,7 @@ pub(crate) fn data_from_obkv_documents(
"word-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@@ -194,7 +199,7 @@ pub(crate) fn data_from_obkv_documents(
TypedChunk::WordPositionDocids,
"word-position-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks,
indexer,
lmdb_writer_sx.clone(),
@@ -204,7 +209,7 @@ pub(crate) fn data_from_obkv_documents(
"word-fid-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_fid_facet_strings_chunks,
indexer,
lmdb_writer_sx.clone(),
@@ -214,7 +219,7 @@ pub(crate) fn data_from_obkv_documents(
"field-id-facet-string-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_fid_facet_numbers_chunks,
indexer,
lmdb_writer_sx,
@@ -269,7 +274,7 @@ fn spawn_extraction_task<FE, FS, M>(
/// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents
fn send_original_documents_data(
original_documents_chunk: Result<grenad::Reader<File>>,
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
vectors_field_id: Option<FieldId>,
@@ -311,7 +316,7 @@ fn send_original_documents_data(
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
fn send_and_extract_flattened_documents_data(
flattened_documents_chunk: Result<grenad::Reader<File>>,
flattened_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: &Option<HashSet<FieldId>>,
@@ -328,7 +333,10 @@ fn send_and_extract_flattened_documents_data(
grenad::Reader<CursorClonableMmap>,
(
grenad::Reader<CursorClonableMmap>,
(grenad::Reader<File>, (grenad::Reader<File>, grenad::Reader<File>)),
(
grenad::Reader<BufReader<File>>,
(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>),
),
),
),
)> {

View File

@@ -1,6 +1,6 @@
use std::borrow::Cow;
use std::fs::File;
use std::io::{self, Seek};
use std::io::{self, BufReader, BufWriter, Seek};
use std::time::Instant;
use grenad::{CompressionType, Sorter};
@@ -17,13 +17,13 @@ pub fn create_writer<R: io::Write>(
typ: grenad::CompressionType,
level: Option<u32>,
file: R,
) -> grenad::Writer<R> {
) -> grenad::Writer<BufWriter<R>> {
let mut builder = grenad::Writer::builder();
builder.compression_type(typ);
if let Some(level) = level {
builder.compression_level(level);
}
builder.build(file)
builder.build(BufWriter::new(file))
}
pub fn create_sorter(
@@ -53,7 +53,7 @@ pub fn create_sorter(
pub fn sorter_into_reader(
sorter: grenad::Sorter<MergeFn>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
@@ -64,16 +64,21 @@ pub fn sorter_into_reader(
writer_into_reader(writer)
}
pub fn writer_into_reader(writer: grenad::Writer<File>) -> Result<grenad::Reader<File>> {
let mut file = writer.into_inner()?;
pub fn writer_into_reader(
writer: grenad::Writer<BufWriter<File>>,
) -> Result<grenad::Reader<BufReader<File>>> {
let mut file = writer
.into_inner()?
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?;
file.rewind()?;
grenad::Reader::new(file).map_err(Into::into)
grenad::Reader::new(BufReader::new(file)).map_err(Into::into)
}
pub unsafe fn as_cloneable_grenad(
reader: &grenad::Reader<File>,
reader: &grenad::Reader<BufReader<File>>,
) -> Result<grenad::Reader<CursorClonableMmap>> {
let file = reader.get_ref();
let file = reader.get_ref().get_ref();
let mmap = memmap2::Mmap::map(file)?;
let cursor = io::Cursor::new(ClonableMmap::from(mmap));
let reader = grenad::Reader::new(cursor)?;
@@ -89,8 +94,8 @@ where
fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result<Self::Output>;
}
impl MergeableReader for Vec<grenad::Reader<File>> {
type Output = grenad::Reader<File>;
impl MergeableReader for Vec<grenad::Reader<BufReader<File>>> {
type Output = grenad::Reader<BufReader<File>>;
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut merger = MergerBuilder::new(merge_fn);
@@ -99,8 +104,8 @@ impl MergeableReader for Vec<grenad::Reader<File>> {
}
}
impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>)> {
type Output = (grenad::Reader<File>, grenad::Reader<File>);
impl MergeableReader for Vec<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
type Output = (grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>);
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut m1 = MergerBuilder::new(merge_fn);
@@ -125,7 +130,7 @@ impl<R: io::Read + io::Seek> MergerBuilder<R> {
Ok(())
}
fn finish(self, params: &GrenadParameters) -> Result<grenad::Reader<File>> {
fn finish(self, params: &GrenadParameters) -> Result<grenad::Reader<BufReader<File>>> {
let merger = self.0.build();
let mut writer = create_writer(
params.chunk_compression_type,
@@ -176,7 +181,7 @@ pub fn grenad_obkv_into_chunks<R: io::Read + io::Seek>(
reader: grenad::Reader<R>,
indexer: GrenadParameters,
documents_chunk_size: usize,
) -> Result<impl Iterator<Item = Result<grenad::Reader<File>>>> {
) -> Result<impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>>> {
let mut continue_reading = true;
let mut cursor = reader.into_cursor()?;

View File

@@ -659,8 +659,12 @@ impl<'a, 'i> Transform<'a, 'i> {
new_documents_ids: self.new_documents_ids,
replaced_documents_ids: self.replaced_documents_ids,
documents_count: self.documents_count,
original_documents,
flattened_documents,
original_documents: original_documents
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?,
flattened_documents: flattened_documents
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?,
})
}
@@ -779,8 +783,12 @@ impl<'a, 'i> Transform<'a, 'i> {
new_documents_ids: documents_ids,
replaced_documents_ids: RoaringBitmap::default(),
documents_count,
original_documents,
flattened_documents,
original_documents: original_documents
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?,
flattened_documents: flattened_documents
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?,
};
let new_facets = output.compute_real_facets(wtxn, self.index)?;

View File

@@ -2,7 +2,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use bytemuck::allocation::pod_collect_to_vec;
use charabia::{Language, Script};
@@ -27,22 +27,22 @@ pub(crate) enum TypedChunk {
FieldIdDocidFacetStrings(grenad::Reader<CursorClonableMmap>),
FieldIdDocidFacetNumbers(grenad::Reader<CursorClonableMmap>),
Documents(grenad::Reader<CursorClonableMmap>),
FieldIdWordcountDocids(grenad::Reader<File>),
FieldIdWordcountDocids(grenad::Reader<BufReader<File>>),
NewDocumentsIds(RoaringBitmap),
WordDocids {
word_docids_reader: grenad::Reader<File>,
exact_word_docids_reader: grenad::Reader<File>,
word_docids_reader: grenad::Reader<BufReader<File>>,
exact_word_docids_reader: grenad::Reader<BufReader<File>>,
},
WordPositionDocids(grenad::Reader<File>),
WordFidDocids(grenad::Reader<File>),
WordPairProximityDocids(grenad::Reader<File>),
FieldIdFacetStringDocids(grenad::Reader<File>),
FieldIdFacetNumberDocids(grenad::Reader<File>),
FieldIdFacetExistsDocids(grenad::Reader<File>),
FieldIdFacetIsNullDocids(grenad::Reader<File>),
FieldIdFacetIsEmptyDocids(grenad::Reader<File>),
GeoPoints(grenad::Reader<File>),
VectorPoints(grenad::Reader<File>),
WordPositionDocids(grenad::Reader<BufReader<File>>),
WordFidDocids(grenad::Reader<BufReader<File>>),
WordPairProximityDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetStringDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetNumberDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetExistsDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetIsNullDocids(grenad::Reader<BufReader<File>>),
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
GeoPoints(grenad::Reader<BufReader<File>>),
VectorPoints(grenad::Reader<BufReader<File>>),
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>),
}

View File

@@ -1,12 +1,12 @@
use std::borrow::Cow;
use std::collections::HashSet;
use std::io::BufReader;
use std::io::{BufReader, BufWriter};
use grenad::CompressionType;
use heed::types::ByteSlice;
use super::index_documents::{merge_cbo_roaring_bitmaps, CursorClonableMmap};
use crate::{Index, Result};
use crate::{Index, InternalError, Result};
mod prefix_word;
mod word_prefix;
@@ -119,9 +119,12 @@ pub fn insert_into_database(
pub fn write_into_lmdb_database_without_merging(
wtxn: &mut heed::RwTxn,
database: heed::PolyDatabase,
writer: grenad::Writer<std::fs::File>,
writer: grenad::Writer<BufWriter<std::fs::File>>,
) -> Result<()> {
let file = writer.into_inner()?;
let file = writer
.into_inner()?
.into_inner()
.map_err(|err| InternalError::BufIntoInnerError(err.to_string()))?;
let reader = grenad::Reader::new(BufReader::new(file))?;
if database.is_empty(wtxn)? {
let mut out_iter = database.iter_mut::<_, ByteSlice, ByteSlice>(wtxn)?;