From 6a047519f6df8c8bc3414f39ed69637fbcf62c2d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Mon, 1 Jun 2020 18:27:26 +0200 Subject: [PATCH] Do a merge two by two --- src/bin/indexer.rs | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/bin/indexer.rs b/src/bin/indexer.rs index c88e014a7..eb5916091 100644 --- a/src/bin/indexer.rs +++ b/src/bin/indexer.rs @@ -285,7 +285,8 @@ fn main() -> anyhow::Result<()> { .open(opt.database)?; let index = Index::new(&env)?; - let mtbl_store = opt.files_to_index + + let mut stores: Vec<_> = opt.files_to_index .into_par_iter() .try_fold(MtblKvStore::default, |acc, path| { let rdr = csv::Reader::from_path(path)?; @@ -295,7 +296,20 @@ fn main() -> anyhow::Result<()> { .inspect(|_| { eprintln!("Total number of documents seen so far is {}", ID_GENERATOR.load(Ordering::Relaxed)) }) - .try_reduce(MtblKvStore::default, MtblKvStore::merge_with)?; + .collect::>()?; + + while stores.len() >= 1 { + let s = std::mem::take(&mut stores); + stores = s.into_par_iter().chunks(2).map(|mut v| { + match (v.pop(), v.pop()) { + (Some(a), Some(b)) => a.merge_with(b), + (Some(a), _) => Ok(a), + _ => unreachable!(), + } + }).collect::>()?; + } + + let mtbl_store = stores.pop().unwrap_or_default(); eprintln!("We are writing into LMDB..."); let mut wtxn = env.write_txn()?;