From b00a1dcc005b5b595cf3c946447a66a9be4bca32 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 15 Jul 2025 23:16:08 +0200 Subject: [PATCH] add an extractor for cellulite in the new pipeline --- Cargo.lock | 184 ++++++++++ crates/milli/src/update/new/document.rs | 35 +- .../src/update/new/extract/geo/cellulite.rs | 336 ++++++++++++++++++ .../milli/src/update/new/extract/geo/mod.rs | 2 + .../src/update/new/extract/vectors/mod.rs | 2 +- .../src/update/new/words_prefix_docids.rs | 2 +- 6 files changed, 558 insertions(+), 3 deletions(-) create mode 100644 crates/milli/src/update/new/extract/geo/cellulite.rs diff --git a/Cargo.lock b/Cargo.lock index ceec0a05e..c04089877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,6 +421,15 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c" +[[package]] +name = "approx" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cab112f0a86d568ea0e627cc1d6be74a1e9cd55214684db5561995f6dad897c6" +dependencies = [ + "num-traits", +] + [[package]] name = "arbitrary" version = "1.4.1" @@ -1053,6 +1062,21 @@ dependencies = [ "smallvec", ] +[[package]] +name = "cellulite" +version = "0.1.0" +source = "git+https://github.com/irevoire/cellulite?branch=main#3654348942bdc67393e9a7b3c655f7e9a98dee50" +dependencies = [ + "geo", + "geo-types", + "geojson", + "h3o", + "heed", + "ordered-float 5.0.0", + "roaring", + "thiserror 2.0.12", +] + [[package]] name = "cexpr" version = "0.6.0" @@ -1815,6 +1839,16 @@ dependencies = [ "bytemuck", ] +[[package]] +name = "earcutr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79127ed59a85d7687c409e9978547cffb7dc79675355ed22da6b66fd5f6ead01" +dependencies = [ + "itertools 0.11.0", + "num-traits", +] + [[package]] name = "either" version = "1.15.0" @@ -2055,6 +2089,18 @@ dependencies = [ "serde_json", ] +[[package]] +name = "float_eq" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853" + +[[package]] +name = "float_next_after" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" + [[package]] name = "flume" version = "0.11.1" @@ -2477,6 +2523,59 @@ dependencies = [ "version_check", ] +[[package]] +name = "geo" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4416397671d8997e9a3e7ad99714f4f00a22e9eaa9b966a5985d2194fc9e02e1" +dependencies = [ + "earcutr", + "float_next_after", + "geo-types", + "geographiclib-rs", + "i_overlay", + "log", + "num-traits", + "robust", + "rstar", + "spade", +] + +[[package]] +name = "geo-types" +version = "0.7.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62ddb1950450d67efee2bbc5e429c68d052a822de3aad010d28b351fbb705224" +dependencies = [ + "approx", + "num-traits", + "rayon", + "rstar", + "serde", +] + +[[package]] +name = "geographiclib-rs" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f611040a2bb37eaa29a78a128d1e92a378a03e0b6e66ae27398d42b1ba9a7841" +dependencies = [ + "libm", +] + +[[package]] +name = "geojson" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e26f3c45b36fccc9cf2805e61d4da6bc4bbd5a3a9589b01afa3a40eff703bd79" +dependencies = [ + "geo-types", + "log", + "serde", + "serde_json", + "thiserror 2.0.12", +] + [[package]] name = "geoutils" version = "0.5.1" @@ -2586,6 +2685,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3o" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd8f6bbd82fcf88ec958095a97201044bc36307ad0ac3dba72106c973e8873a9" +dependencies = [ + "ahash 0.8.12", + "either", + "float_eq", + "geo", + "h3o-bit", + "libm", +] + +[[package]] +name = "h3o-bit" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb45e8060378c0353781abf67e1917b545a6b710d0342d85b70c125af7ef320" + [[package]] name = "half" version = "2.6.0" @@ -2670,6 +2789,7 @@ dependencies = [ "lmdb-master-sys", "once_cell", "page_size", + "serde", "synchronoise", "url", ] @@ -2850,6 +2970,50 @@ dependencies = [ "tracing", ] +[[package]] +name = "i_float" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85df3a416829bb955fdc2416c7b73680c8dcea8d731f2c7aa23e1042fe1b8343" +dependencies = [ + "serde", +] + +[[package]] +name = "i_key_sort" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "347c253b4748a1a28baf94c9ce133b6b166f08573157e05afe718812bc599fcd" + +[[package]] +name = "i_overlay" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0542dfef184afdd42174a03dcc0625b6147fb73e1b974b1a08a2a42ac35cee49" +dependencies = [ + "i_float", + "i_key_sort", + "i_shape", + "i_tree", + "rayon", +] + +[[package]] +name = "i_shape" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a38f5a42678726718ff924f6d4a0e79b129776aeed298f71de4ceedbd091bce" +dependencies = [ + "i_float", + "serde", +] + +[[package]] +name = "i_tree" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "155181bc97d770181cf9477da51218a19ee92a8e5be642e796661aee2b601139" + [[package]] name = "icu_collections" version = "2.0.0" @@ -3934,6 +4098,7 @@ dependencies = [ "candle-core", "candle-nn", "candle-transformers", + "cellulite", "charabia", "concat-arrays", "convert_case 0.8.0", @@ -3947,6 +4112,7 @@ dependencies = [ "flume", "fst", "fxhash", + "geojson", "geoutils", "grenad", "hashbrown 0.15.4", @@ -5221,6 +5387,12 @@ dependencies = [ "serde", ] +[[package]] +name = "robust" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e27ee8bb91ca0adcf0ecb116293afa12d393f9c2b9b9cd54d33e8078fe19839" + [[package]] name = "rstar" version = "0.12.2" @@ -5753,6 +5925,18 @@ dependencies = [ "winapi", ] +[[package]] +name = "spade" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a14e31a007e9f85c32784b04f89e6e194bb252a4d41b4a8ccd9e77245d901c8c" +dependencies = [ + "hashbrown 0.15.4", + "num-traits", + "robust", + "smallvec", +] + [[package]] name = "spin" version = "0.5.2" diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index d520bb952..3e8279d01 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -10,7 +10,9 @@ use serde_json::value::RawValue; use super::vector_document::VectorDocument; use super::{KvReaderFieldId, KvWriterFieldId}; -use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; +use crate::constants::{ + RESERVED_GEOJSON_FIELD_NAME, RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME, +}; use crate::documents::FieldIdMapper; use crate::update::del_add::KvReaderDelAdd; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; @@ -26,6 +28,7 @@ use crate::{ /// /// The 'doc lifetime is meant to live sufficiently for the document to be handled by the extractors. pub trait Document<'doc> { + fn geojson_field(&self) -> Result>; /// Iterate over all **top-level** fields of the document, returning their name and raw JSON value. /// /// - The returned values *may* contain nested fields. @@ -113,6 +116,10 @@ impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> { self.field(RESERVED_GEO_FIELD_NAME) } + fn geojson_field(&self) -> Result> { + self.field(RESERVED_GEOJSON_FIELD_NAME) + } + fn top_level_fields_count(&self) -> usize { let has_vectors_field = self.vectors_field().unwrap_or(None).is_some(); let has_geo_field = self.geo_field().unwrap_or(None).is_some(); @@ -177,6 +184,10 @@ impl<'doc> Document<'doc> for DocumentFromVersions<'_, 'doc> { Ok(self.versions.geo_field()) } + fn geojson_field(&self) -> Result> { + Ok(self.versions.geojson_field()) + } + fn top_level_fields_count(&self) -> usize { let has_vectors_field = self.vectors_field().unwrap_or(None).is_some(); let has_geo_field = self.geo_field().unwrap_or(None).is_some(); @@ -265,6 +276,16 @@ impl<'d, 'doc: 'd, 't: 'd, Mapper: FieldIdMapper> Document<'d> db.geo_field() } + fn geojson_field(&self) -> Result> { + if let Some(geojson) = self.new_doc.geojson_field()? { + return Ok(Some(geojson)); + } + + let Some(db) = self.db else { return Ok(None) }; + + db.geojson_field() + } + fn top_level_fields_count(&self) -> usize { self.iter_top_level_fields().count() } @@ -296,6 +317,10 @@ where D::geo_field(self) } + fn geojson_field(&self) -> Result> { + D::geojson_field(self) + } + fn top_level_fields_count(&self) -> usize { D::top_level_fields_count(self) } @@ -454,6 +479,10 @@ impl<'doc> Versions<'doc> { self.data.get(RESERVED_GEO_FIELD_NAME) } + pub fn geojson_field(&self) -> Option<&'doc RawValue> { + self.data.get(RESERVED_GEOJSON_FIELD_NAME) + } + pub fn len(&self) -> usize { self.data.len() } @@ -572,6 +601,10 @@ impl<'a, Mapper: FieldIdMapper> Document<'a> for KvDelAddDocument<'a, Mapper> { fn geo_field(&self) -> Result> { self.get(RESERVED_GEO_FIELD_NAME) } + + fn geojson_field(&self) -> Result> { + self.get(RESERVED_GEOJSON_FIELD_NAME) + } } pub struct DocumentIdentifiers<'doc> { diff --git a/crates/milli/src/update/new/extract/geo/cellulite.rs b/crates/milli/src/update/new/extract/geo/cellulite.rs new file mode 100644 index 000000000..0fcde3c4e --- /dev/null +++ b/crates/milli/src/update/new/extract/geo/cellulite.rs @@ -0,0 +1,336 @@ +use std::cell::RefCell; +use std::fs::File; +use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Seek as _, Write as _}; +use std::str::FromStr; +use std::{iter, mem, result}; + +use bumpalo::Bump; +use bytemuck::{bytes_of, pod_read_unaligned, Pod, Zeroable}; +use geojson::GeoJson; +use heed::RoTxn; +use serde_json::value::RawValue; +use serde_json::Value; + +use crate::error::GeoError; +use crate::update::new::document::{Document, DocumentContext}; +use crate::update::new::indexer::document_changes::Extractor; +use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::thread_local::MostlySend; +use crate::update::new::DocumentChange; +use crate::update::GrenadParameters; +use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result, UserError}; + +pub struct GeoJsonExtractor { + grenad_parameters: GrenadParameters, +} + +impl GeoJsonExtractor { + pub fn new( + rtxn: &RoTxn, + index: &Index, + grenad_parameters: GrenadParameters, + ) -> Result> { + if index.is_geojson_enabled(rtxn)? { + Ok(Some(GeoJsonExtractor { grenad_parameters })) + } else { + Ok(None) + } + } +} + +/* +#[derive(Pod, Zeroable, Copy, Clone)] +#[repr(C, packed)] +pub struct ExtractedGeoPoint { + pub docid: DocumentId, + pub lat_lng: [f64; 2], +} + +impl From for GeoPoint { + /// Converts the latitude and longitude back to an xyz GeoPoint. + fn from(value: ExtractedGeoPoint) -> Self { + let [lat, lng] = value.lat_lng; + let point = [lat, lng]; + let xyz_point = lat_lng_to_xyz(&point); + GeoPoint::new(xyz_point, (value.docid, point)) + } +} +*/ + +pub struct GeoJsonExtractorData<'extractor> { + /// The set of documents ids that were removed. If a document sees its geo + /// point being updated, we first put it in the deleted and then in the inserted. + removed: bumpalo::collections::Vec<'extractor, GeoJson>, + inserted: bumpalo::collections::Vec<'extractor, GeoJson>, + /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points + /// data structures if we have spilled to disk. + spilled_removed: Option>, + /// Contains a packed list of `ExtractedGeoPoint` of the inserted geo points + /// data structures if we have spilled to disk. + spilled_inserted: Option>, +} + +impl<'extractor> GeoJsonExtractorData<'extractor> { + pub fn freeze(self) -> Result> { + let GeoJsonExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self; + + Ok(FrozenGeoJsonExtractorData { + removed: removed.into_bump_slice(), + inserted: inserted.into_bump_slice(), + spilled_removed: spilled_removed + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + spilled_inserted: spilled_inserted + .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error())) + .transpose()?, + }) + } +} + +unsafe impl MostlySend for GeoJsonExtractorData<'_> {} + +pub struct FrozenGeoJsonExtractorData<'extractor> { + pub removed: &'extractor [GeoJson], + pub inserted: &'extractor [GeoJson], + pub spilled_removed: Option>, + pub spilled_inserted: Option>, +} + +impl FrozenGeoJsonExtractorData<'_> { + pub fn iter_and_clear_removed( + &mut self, + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.removed) + .iter() + .cloned() + .map(Ok) + .chain(iterator_over_spilled_geojsons(&mut self.spilled_removed)?)) + } + + pub fn iter_and_clear_inserted( + &mut self, + ) -> io::Result> + '_> { + Ok(mem::take(&mut self.inserted) + .iter() + .cloned() + .map(Ok) + .chain(iterator_over_spilled_geojsons(&mut self.spilled_inserted)?)) + } +} + +fn iterator_over_spilled_geojsons( + spilled: &mut Option>, +) -> io::Result> + '_> { + let mut spilled = spilled.take(); + if let Some(spilled) = &mut spilled { + spilled.rewind()?; + } + + Ok(iter::from_fn(move || match &mut spilled { + Some(file) => { + match GeoJson::from_reader(file) { + Ok(geojson) => Some(Ok(geojson)), + Err(e) if e.is_eof() => None, + Err(e) => Some(Err(e)), + } + } + None => None, + })) +} + +impl<'extractor> Extractor<'extractor> for GeoJsonExtractor { + type Data = RefCell>; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result { + Ok(RefCell::new(GeoJsonExtractorData { + removed: bumpalo::collections::Vec::new_in(extractor_alloc), + inserted: bumpalo::collections::Vec::new_in(extractor_alloc), + spilled_inserted: None, + spilled_removed: None, + })) + } + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentContext, + ) -> Result<()> { + let rtxn = &context.rtxn; + let index = context.index; + let max_memory = self.grenad_parameters.max_memory_by_thread(); + let db_fields_ids_map = context.db_fields_ids_map; + let mut data_ref = context.data.borrow_mut_or_yield(); + + for change in changes { + if data_ref.spilled_removed.is_none() + && max_memory.is_some_and(|mm| context.extractor_alloc.allocated_bytes() >= mm) + { + // We must spill as we allocated too much memory + data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?; + data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?; + } + + match change? { + DocumentChange::Deletion(deletion) => { + let docid = deletion.docid(); + let external_id = deletion.external_document_id(); + let current = deletion.current(rtxn, index, db_fields_ids_map)?; + + if let Some(geojson) = current.geojson_field()? { + match &mut data_ref.spilled_removed { + Some(file) => file.write_all(geojson.get().as_bytes())?, + None => data_ref.removed.push( + // TODO: The error type is wrong here. It should be an internal error. + GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + ), + } + } + } + DocumentChange::Update(update) => { + let current = update.current(rtxn, index, db_fields_ids_map)?; + let external_id = update.external_document_id(); + let docid = update.docid(); + + let current_geo = current.geojson_field()?; + + let updated_geo = + update.merged(rtxn, index, db_fields_ids_map)?.geojson_field()?; + + if current_geo.map(|c| c.get()) != updated_geo.map(|u| u.get()) { + // If the current and new geo points are different it means that + // we need to replace the current by the new point and therefore + // delete the current point from cellulite. + if let Some(geojson) = current_geo { + match &mut data_ref.spilled_removed { + Some(file) => file.write_all(geojson.get().as_bytes())?, + // TODO: Should be an internal error + None => data_ref.removed.push( + GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + ), + } + } + + if let Some(geojson) = updated_geo { + match &mut data_ref.spilled_inserted { + Some(file) => file.write_all(geojson.get().as_bytes())?, + // TODO: Is the error type correct here? Shouldn't it be an internal error? + None => data_ref.inserted.push( + GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + ), + } + } + } + } + DocumentChange::Insertion(insertion) => { + let external_id = insertion.external_document_id(); + let docid = insertion.docid(); + + let inserted_geo = insertion.inserted().geojson_field()?; + + if let Some(geojson) = inserted_geo { + match &mut data_ref.spilled_inserted { + Some(file) => file.write_all(geojson.get().as_bytes())?, + // TODO: Is the error type correct here? Shouldn't it be an internal error? + None => data_ref.inserted.push( + GeoJson::from_str(geojson.get()).map_err(UserError::from)?, + ), + } + } + } + } + } + + Ok(()) + } +} + +/// Extracts and validates the latitude and latitude from a document geo field. +/// +/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`. +pub fn extract_geo_coordinates( + external_id: &str, + raw_value: &RawValue, +) -> Result> { + let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? { + Value::Null => return Ok(None), + Value::Object(map) => map, + value => { + return Err(Box::new(GeoError::NotAnObject { + document_id: Value::from(external_id), + value, + }) + .into()) + } + }; + + let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) { + (Some(lat), Some(lng)) => { + if geo.is_empty() { + [lat, lng] + } else { + return Err(Box::new(GeoError::UnexpectedExtraFields { + document_id: Value::from(external_id), + value: Value::from(geo), + }) + .into()); + } + } + (Some(_), None) => { + return Err(Box::new(GeoError::MissingLongitude { + document_id: Value::from(external_id), + }) + .into()) + } + (None, Some(_)) => { + return Err(Box::new(GeoError::MissingLatitude { + document_id: Value::from(external_id), + }) + .into()) + } + (None, None) => { + return Err(Box::new(GeoError::MissingLatitudeAndLongitude { + document_id: Value::from(external_id), + }) + .into()) + } + }; + + match (extract_finite_float_from_value(lat), extract_finite_float_from_value(lng)) { + (Ok(lat), Ok(lng)) => Ok(Some([lat, lng])), + (Ok(_), Err(value)) => { + Err(Box::new(GeoError::BadLongitude { document_id: Value::from(external_id), value }) + .into()) + } + (Err(value), Ok(_)) => { + Err(Box::new(GeoError::BadLatitude { document_id: Value::from(external_id), value }) + .into()) + } + (Err(lat), Err(lng)) => Err(Box::new(GeoError::BadLatitudeAndLongitude { + document_id: Value::from(external_id), + lat, + lng, + }) + .into()), + } +} + +/// Extracts and validate that a serde JSON Value is actually a finite f64. +pub fn extract_finite_float_from_value(value: Value) -> result::Result { + let number = match value { + Value::Number(ref n) => match n.as_f64() { + Some(number) => number, + None => return Err(value), + }, + Value::String(ref s) => match s.parse::() { + Ok(number) => number, + Err(_) => return Err(value), + }, + value => return Err(value), + }; + + if number.is_finite() { + Ok(number) + } else { + Err(value) + } +} diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 8e164b48f..9c0af1603 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -18,6 +18,8 @@ use crate::update::new::DocumentChange; use crate::update::GrenadParameters; use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Result}; +mod cellulite; + pub struct GeoExtractor { grenad_parameters: GrenadParameters, } diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 4ca68027c..25a26f9fb 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -630,7 +630,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { settings_delta.try_for_each_fragment_diff( session.embedder_name(), - |fragment_diff| { + |fragment_diff| -> Result<()> { let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc) .ignore_errors(); let old = if full_reindex { diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index 9abd01bac..27c485d71 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -193,7 +193,7 @@ impl WordPrefixIntegerDocids { // We access this HashMap in parallel to compute the *union* of all // of them and *serialize* them into files. There is one file by CPU. let local_entries = ThreadLocal::with_capacity(rayon::current_num_threads()); - prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| { + prefixes.into_par_iter().map(AsRef::as_ref).try_for_each(|prefix| -> Result<()> { let refcell = local_entries.get_or(|| { let file = BufWriter::new(spooled_tempfile( self.max_memory_by_thread.unwrap_or(usize::MAX),