use a bufreader everytime there is a grenad<file>

This commit is contained in:
Tamo
2023-09-28 16:26:01 +02:00
parent 8fe8ddea79
commit d772073dfa
22 changed files with 122 additions and 86 deletions

View File

@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io::BufReader;
use std::{io, mem, str};
use charabia::{Language, Script, SeparatorKind, Token, TokenKind, Tokenizer, TokenizerBuilder};
@ -31,7 +32,7 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
allowed_separators: Option<&[&str]>,
dictionary: Option<&[&str]>,
max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
) -> Result<(RoaringBitmap, grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> {
puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes

View File

@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use heed::{BytesDecode, BytesEncode};
@ -19,7 +19,7 @@ use crate::Result;
pub fn extract_facet_number_docids<R: io::Read + io::Seek>(
docid_fid_facet_number: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use heed::BytesEncode;
@ -17,7 +17,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH};
pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
docid_fid_facet_string: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,7 +1,7 @@
use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use std::mem::size_of;
use heed::zerocopy::AsBytes;
@ -17,11 +17,11 @@ use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result, BEU32, MAX_FACET
/// The extracted facet values stored in grenad files by type.
pub struct ExtractedFacetValues {
pub docid_fid_facet_numbers_chunk: grenad::Reader<File>,
pub docid_fid_facet_strings_chunk: grenad::Reader<File>,
pub fid_facet_is_null_docids_chunk: grenad::Reader<File>,
pub fid_facet_is_empty_docids_chunk: grenad::Reader<File>,
pub fid_facet_exists_docids_chunk: grenad::Reader<File>,
pub docid_fid_facet_numbers_chunk: grenad::Reader<BufReader<File>>,
pub docid_fid_facet_strings_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_is_null_docids_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_is_empty_docids_chunk: grenad::Reader<BufReader<File>>,
pub fid_facet_exists_docids_chunk: grenad::Reader<BufReader<File>>,
}
/// Extracts the facet values of each faceted field of each document.

View File

@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use grenad::Sorter;
@ -21,7 +21,7 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result};
pub fn extract_fid_word_count_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use concat_arrays::concat_arrays;
use serde_json::Value;
@ -18,7 +18,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
indexer: GrenadParameters,
primary_key_id: FieldId,
(lat_fid, lng_fid): (FieldId, FieldId),
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(

View File

@ -1,6 +1,6 @@
use std::convert::TryFrom;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use bytemuck::cast_slice;
use serde_json::{from_slice, Value};
@ -18,7 +18,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
indexer: GrenadParameters,
primary_key_id: FieldId,
vectors_fid: FieldId,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let mut writer = create_writer(

View File

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use std::iter::FromIterator;
use roaring::RoaringBitmap;
@ -26,7 +26,7 @@ pub fn extract_word_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
exact_attributes: &HashSet<FieldId>,
) -> Result<(grenad::Reader<File>, grenad::Reader<File>)> {
) -> Result<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
@ -14,7 +14,7 @@ use crate::{relative_from_absolute_position, DocumentId, Result};
pub fn extract_word_fid_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::fs::File;
use std::io::BufReader;
use std::{cmp, io, mem, str, vec};
use super::helpers::{
@ -20,7 +21,7 @@ use crate::{DocumentId, Result};
pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -1,5 +1,5 @@
use std::fs::File;
use std::io;
use std::io::{self, BufReader};
use super::helpers::{
create_sorter, merge_cbo_roaring_bitmaps, read_u32_ne_bytes, sorter_into_reader,
@ -17,7 +17,7 @@ use crate::{bucketed_position, relative_from_absolute_position, DocumentId, Resu
pub fn extract_word_position_docids<R: io::Read + io::Seek>(
docid_word_positions: grenad::Reader<R>,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
) -> Result<grenad::Reader<BufReader<File>>> {
puffin::profile_function!();
let max_memory = indexer.max_memory_by_thread();

View File

@ -12,6 +12,7 @@ mod extract_word_position_docids;
use std::collections::HashSet;
use std::fs::File;
use std::io::BufReader;
use crossbeam_channel::Sender;
use log::debug;
@ -39,8 +40,8 @@ use crate::{FieldId, Result};
/// Send data in grenad file over provided Sender.
#[allow(clippy::too_many_arguments)]
pub(crate) fn data_from_obkv_documents(
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<File>>> + Send,
original_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
flattened_obkv_chunks: impl Iterator<Item = Result<grenad::Reader<BufReader<File>>>> + Send,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: Option<HashSet<FieldId>>,
@ -152,7 +153,7 @@ pub(crate) fn data_from_obkv_documents(
});
}
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@ -162,7 +163,7 @@ pub(crate) fn data_from_obkv_documents(
"word-pair-proximity-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@ -172,7 +173,11 @@ pub(crate) fn data_from_obkv_documents(
"field-id-wordcount-docids",
);
spawn_extraction_task::<_, _, Vec<(grenad::Reader<File>, grenad::Reader<File>)>>(
spawn_extraction_task::<
_,
_,
Vec<(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>)>,
>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@ -185,7 +190,7 @@ pub(crate) fn data_from_obkv_documents(
"word-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks.clone(),
indexer,
lmdb_writer_sx.clone(),
@ -194,7 +199,7 @@ pub(crate) fn data_from_obkv_documents(
TypedChunk::WordPositionDocids,
"word-position-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_word_positions_chunks,
indexer,
lmdb_writer_sx.clone(),
@ -204,7 +209,7 @@ pub(crate) fn data_from_obkv_documents(
"word-fid-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_fid_facet_strings_chunks,
indexer,
lmdb_writer_sx.clone(),
@ -214,7 +219,7 @@ pub(crate) fn data_from_obkv_documents(
"field-id-facet-string-docids",
);
spawn_extraction_task::<_, _, Vec<grenad::Reader<File>>>(
spawn_extraction_task::<_, _, Vec<grenad::Reader<BufReader<File>>>>(
docid_fid_facet_numbers_chunks,
indexer,
lmdb_writer_sx,
@ -269,7 +274,7 @@ fn spawn_extraction_task<FE, FS, M>(
/// Extract chunked data and send it into lmdb_writer_sx sender:
/// - documents
fn send_original_documents_data(
original_documents_chunk: Result<grenad::Reader<File>>,
original_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
vectors_field_id: Option<FieldId>,
@ -311,7 +316,7 @@ fn send_original_documents_data(
#[allow(clippy::too_many_arguments)]
#[allow(clippy::type_complexity)]
fn send_and_extract_flattened_documents_data(
flattened_documents_chunk: Result<grenad::Reader<File>>,
flattened_documents_chunk: Result<grenad::Reader<BufReader<File>>>,
indexer: GrenadParameters,
lmdb_writer_sx: Sender<Result<TypedChunk>>,
searchable_fields: &Option<HashSet<FieldId>>,
@ -328,7 +333,10 @@ fn send_and_extract_flattened_documents_data(
grenad::Reader<CursorClonableMmap>,
(
grenad::Reader<CursorClonableMmap>,
(grenad::Reader<File>, (grenad::Reader<File>, grenad::Reader<File>)),
(
grenad::Reader<BufReader<File>>,
(grenad::Reader<BufReader<File>>, grenad::Reader<BufReader<File>>),
),
),
),
)> {