refactor spawn_extraction_task

This commit is contained in:
ad hoc
2022-03-23 14:48:15 +01:00
parent f82d4b36eb
commit 5f9f82757d
3 changed files with 69 additions and 30 deletions

View File

@ -78,25 +78,62 @@ pub unsafe fn as_cloneable_grenad(
Ok(reader)
}
pub fn merge_readers<R: io::Read + io::Seek>(
readers: Vec<grenad::Reader<R>>,
merge_fn: MergeFn,
indexer: GrenadParameters,
) -> Result<grenad::Reader<File>> {
let mut merger_builder = grenad::MergerBuilder::new(merge_fn);
for reader in readers {
merger_builder.push(reader.into_cursor()?);
pub trait MergeableReader
where
Self: Sized,
{
type Output;
fn merge(self, merge_fn: MergeFn, indexer: &GrenadParameters) -> Result<Self::Output>;
}
impl MergeableReader for Vec<grenad::Reader<File>> {
type Output = grenad::Reader<File>;
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut merger = MergerBuilder::new(merge_fn);
self.into_iter().try_for_each(|r| merger.push(r))?;
merger.finish(params)
}
}
impl MergeableReader for Vec<(grenad::Reader<File>, grenad::Reader<File>)> {
type Output = (grenad::Reader<File>, grenad::Reader<File>);
fn merge(self, merge_fn: MergeFn, params: &GrenadParameters) -> Result<Self::Output> {
let mut m1 = MergerBuilder::new(merge_fn);
let mut m2 = MergerBuilder::new(merge_fn);
for (r1, r2) in self.into_iter() {
m1.push(r1)?;
m2.push(r2)?;
}
Ok((m1.finish(params)?, m2.finish(params)?))
}
}
struct MergerBuilder<R>(grenad::MergerBuilder<R, MergeFn>);
impl<R: io::Read + io::Seek> MergerBuilder<R> {
fn new(merge_fn: MergeFn) -> Self {
Self(grenad::MergerBuilder::new(merge_fn))
}
let merger = merger_builder.build();
let mut writer = create_writer(
indexer.chunk_compression_type,
indexer.chunk_compression_level,
tempfile::tempfile()?,
);
merger.write_into_stream_writer(&mut writer)?;
fn push(&mut self, reader: grenad::Reader<R>) -> Result<()> {
self.0.push(reader.into_cursor()?);
Ok(())
}
Ok(writer_into_reader(writer)?)
fn finish(self, params: &GrenadParameters) -> Result<grenad::Reader<File>> {
let merger = self.0.build();
let mut writer = create_writer(
params.chunk_compression_type,
params.chunk_compression_level,
tempfile::tempfile()?,
);
merger.write_into_stream_writer(&mut writer)?;
Ok(writer_into_reader(writer)?)
}
}
#[derive(Debug, Clone, Copy)]