mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 05:26:27 +00:00 
			
		
		
		
	Support CSV again
This commit is contained in:
		| @@ -1,14 +1,18 @@ | ||||
| use std::fmt::{self, Debug, Display}; | ||||
| use std::fs::File; | ||||
| use std::io::{self, BufReader, BufWriter, Write}; | ||||
| use std::io::{self, BufReader, BufWriter, Seek, Write}; | ||||
| use std::marker::PhantomData; | ||||
|  | ||||
| use csv::StringRecord; | ||||
| use memmap2::Mmap; | ||||
| use milli::documents::Error; | ||||
| use milli::update::new::TopLevelMap; | ||||
| use milli::Object; | ||||
| use serde::de::{SeqAccess, Visitor}; | ||||
| use serde::{Deserialize, Deserializer}; | ||||
| use serde_json::error::Category; | ||||
|  | ||||
| use crate::error::deserr_codes::MalformedPayload; | ||||
| use crate::error::{Code, ErrorCode}; | ||||
|  | ||||
| type Result<T> = std::result::Result<T, DocumentFormatError>; | ||||
| @@ -87,6 +91,16 @@ impl From<(PayloadType, Error)> for DocumentFormatError { | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<(PayloadType, serde_json::Error)> for DocumentFormatError { | ||||
|     fn from((ty, error): (PayloadType, serde_json::Error)) -> Self { | ||||
|         if error.classify() == Category::Data { | ||||
|             Self::Io(error.into()) | ||||
|         } else { | ||||
|             Self::MalformedPayload(Error::Json(error), ty) | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| impl From<io::Error> for DocumentFormatError { | ||||
|     fn from(error: io::Error) -> Self { | ||||
|         Self::Io(error) | ||||
| @@ -102,67 +116,156 @@ impl ErrorCode for DocumentFormatError { | ||||
|     } | ||||
| } | ||||
|  | ||||
| // TODO remove that from the place I've borrowed it | ||||
| #[derive(Debug)] | ||||
| enum AllowedType { | ||||
|     String, | ||||
|     Boolean, | ||||
|     Number, | ||||
| } | ||||
|  | ||||
| fn parse_csv_header(header: &str) -> (&str, AllowedType) { | ||||
|     // if there are several separators we only split on the last one. | ||||
|     match header.rsplit_once(':') { | ||||
|         Some((field_name, field_type)) => match field_type { | ||||
|             "string" => (field_name, AllowedType::String), | ||||
|             "boolean" => (field_name, AllowedType::Boolean), | ||||
|             "number" => (field_name, AllowedType::Number), | ||||
|             // if the pattern isn't recognized, we keep the whole field. | ||||
|             _otherwise => (header, AllowedType::String), | ||||
|         }, | ||||
|         None => (header, AllowedType::String), | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Reads CSV from input and write an obkv batch to writer. | ||||
| pub fn read_csv<F: Write>( | ||||
|     _input: BufReader<File>, | ||||
|     _output: &mut BufWriter<F>, | ||||
|     _delimiter: u8, | ||||
| ) -> Result<u64> { | ||||
|     todo!() | ||||
|     // let mut builder = DocumentsBatchBuilder::new(BufWriter::new(output)); | ||||
|     // let mmap = unsafe { MmapOptions::new().map(input)? }; | ||||
|     // let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); | ||||
|     // builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; | ||||
| pub fn read_csv(input: &File, output: impl io::Write, delimiter: u8) -> Result<u64> { | ||||
|     use serde_json::{Map, Value}; | ||||
|  | ||||
|     // let count = builder.documents_count(); | ||||
|     // let _ = builder.into_inner().map_err(DocumentFormatError::Io)?; | ||||
|     let mut output = BufWriter::new(output); | ||||
|     let mut reader = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(input); | ||||
|  | ||||
|     // Ok(count as u64) | ||||
|     // TODO manage error correctly | ||||
|     // Make sure that we insert the fields ids in order as the obkv writer has this requirement. | ||||
|     let mut typed_fields: Vec<_> = reader | ||||
|         .headers() | ||||
|         .unwrap() | ||||
|         .into_iter() | ||||
|         .map(parse_csv_header) | ||||
|         .map(|(f, t)| (f.to_string(), t)) | ||||
|         .collect(); | ||||
|  | ||||
|     let mut object: Map<_, _> = | ||||
|         reader.headers().unwrap().iter().map(|k| (k.to_string(), Value::Null)).collect(); | ||||
|  | ||||
|     let mut line: usize = 0; | ||||
|     let mut record = csv::StringRecord::new(); | ||||
|     while reader.read_record(&mut record).unwrap() { | ||||
|         // We increment here and not at the end of the while loop to take | ||||
|         // the header offset into account. | ||||
|         line += 1; | ||||
|  | ||||
|         // Reset the document to write | ||||
|         object.iter_mut().for_each(|(_, v)| *v = Value::Null); | ||||
|  | ||||
|         for (i, (name, type_)) in typed_fields.iter().enumerate() { | ||||
|             let value = &record[i]; | ||||
|             let trimmed_value = value.trim(); | ||||
|             let value = match type_ { | ||||
|                 AllowedType::Number if trimmed_value.is_empty() => Value::Null, | ||||
|                 AllowedType::Number => match trimmed_value.parse::<i64>() { | ||||
|                     Ok(integer) => Value::from(integer), | ||||
|                     Err(_) => { | ||||
|                         match trimmed_value.parse::<f64>() { | ||||
|                             Ok(float) => Value::from(float), | ||||
|                             Err(error) => { | ||||
|                                 panic!("bad float") | ||||
|                                 // return Err(Error::ParseFloat { | ||||
|                                 //     error, | ||||
|                                 //     line, | ||||
|                                 //     value: value.to_string(), | ||||
|                                 // }); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                 }, | ||||
|                 AllowedType::Boolean if trimmed_value.is_empty() => Value::Null, | ||||
|                 AllowedType::Boolean => match trimmed_value.parse::<bool>() { | ||||
|                     Ok(bool) => Value::from(bool), | ||||
|                     Err(error) => { | ||||
|                         panic!("bad bool") | ||||
|                         // return Err(Error::ParseBool { | ||||
|                         //     error, | ||||
|                         //     line, | ||||
|                         //     value: value.to_string(), | ||||
|                         // }); | ||||
|                     } | ||||
|                 }, | ||||
|                 AllowedType::String if value.is_empty() => Value::Null, | ||||
|                 AllowedType::String => Value::from(value), | ||||
|             }; | ||||
|  | ||||
|             *object.get_mut(name).unwrap() = value; | ||||
|         } | ||||
|  | ||||
|         serde_json::to_writer(&mut output, &object).unwrap(); | ||||
|     } | ||||
|  | ||||
|     Ok(line.saturating_sub(1) as u64) | ||||
| } | ||||
|  | ||||
| /// Reads JSON from temporary file and write an obkv batch to writer. | ||||
| pub fn read_json<F: Write>(input: BufReader<File>, mut output: &mut BufWriter<F>) -> Result<u64> { | ||||
| pub fn read_json(input: &File, output: impl io::Write) -> Result<u64> { | ||||
|     // We memory map to be able to deserailize into a TopLevelMap<'pl> that | ||||
|     // does not allocate when possible and only materialize the first/top level. | ||||
|     let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; | ||||
|  | ||||
|     let mut deserializer = serde_json::Deserializer::from_slice(&input); | ||||
|     let mut output = BufWriter::new(output); | ||||
|     let mut count = 0; | ||||
|     let mut deserializer = serde_json::Deserializer::from_reader(input); | ||||
|     match array_each(&mut deserializer, |obj: Object| { | ||||
|  | ||||
|     let count_and_write = |obj: TopLevelMap| { | ||||
|         count += 1; | ||||
|         serde_json::to_writer(&mut output, &obj) | ||||
|     }) { | ||||
|     }; | ||||
|  | ||||
|     match array_each(&mut deserializer, count_and_write) { | ||||
|         // The json data has been deserialized and does not need to be processed again. | ||||
|         // The data has been transferred to the writer during the deserialization process. | ||||
|         Ok(Ok(_)) => (), | ||||
|         Ok(Err(e)) => return Err(DocumentFormatError::Io(e.into())), | ||||
|         Ok(Err(e)) => return Err(DocumentFormatError::from((PayloadType::Json, e))), | ||||
|         Err(e) => { | ||||
|             // Attempt to deserialize a single json string when the cause of the exception is not Category.data | ||||
|             // Other types of deserialisation exceptions are returned directly to the front-end | ||||
|             if e.classify() != serde_json::error::Category::Data { | ||||
|                 return Err(DocumentFormatError::MalformedPayload( | ||||
|                     Error::Json(e), | ||||
|                     PayloadType::Json, | ||||
|                 )); | ||||
|             if e.classify() != Category::Data { | ||||
|                 return Err(DocumentFormatError::from((PayloadType::Json, e))); | ||||
|             } | ||||
|  | ||||
|             todo!("single document/object update") | ||||
|  | ||||
|             // let content: Object = serde_json::from_slice(&mmap) | ||||
|             //     .map_err(Error::Json) | ||||
|             //     .map_err(|e| (PayloadType::Json, e))?; | ||||
|             // serde_json::to_writer(&mut output, &content).unwrap() | ||||
|             let content: Object = serde_json::from_slice(&input) | ||||
|                 .map_err(Error::Json) | ||||
|                 .map_err(|e| (PayloadType::Json, e))?; | ||||
|             serde_json::to_writer(&mut output, &content).unwrap() | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     Ok(count) | ||||
|     match output.into_inner() { | ||||
|         Ok(_) => Ok(count), | ||||
|         Err(ie) => Err(DocumentFormatError::Io(ie.into_error())), | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Reads JSON from temporary file and write it into the writer. | ||||
| pub fn read_ndjson<F: Write>(input: BufReader<File>, mut output: &mut BufWriter<F>) -> Result<u64> { | ||||
| pub fn read_ndjson(input: &File, mut output: impl io::Write) -> Result<u64> { | ||||
|     // We memory map to be able to deserailize into a TopLevelMap<'pl> that | ||||
|     // does not allocate when possible and only materialize the first/top level. | ||||
|     let input = unsafe { Mmap::map(input).map_err(DocumentFormatError::Io)? }; | ||||
|  | ||||
|     let mut count = 0; | ||||
|     for result in serde_json::Deserializer::from_reader(input).into_iter() { | ||||
|     for result in serde_json::Deserializer::from_slice(&input).into_iter() { | ||||
|         count += 1; | ||||
|         // TODO Correctly manage the errors | ||||
|         //      Avoid copying the content: use CowStr from milli (move it elsewhere) | ||||
|         let map: Object = result.unwrap(); | ||||
|         serde_json::to_writer(&mut output, &map).unwrap(); | ||||
|         result | ||||
|             .and_then(|map: TopLevelMap| serde_json::to_writer(&mut output, &map)) | ||||
|             .map_err(|e| DocumentFormatError::from((PayloadType::Ndjson, e)))?; | ||||
|     } | ||||
|  | ||||
|     Ok(count) | ||||
|   | ||||
| @@ -423,7 +423,7 @@ async fn document_addition( | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     let (uuid, update_file) = index_scheduler.create_update_file(dry_run)?; | ||||
|     let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?; | ||||
|  | ||||
|     let temp_file = match tempfile() { | ||||
|         Ok(file) => file, | ||||
| @@ -459,20 +459,14 @@ async fn document_addition( | ||||
|         return Err(MeilisearchHttpError::Payload(ReceivePayload(Box::new(e)))); | ||||
|     } | ||||
|  | ||||
|     let read_file = BufReader::new(buffer.into_inner().into_std().await); | ||||
|     let read_file = buffer.into_inner().into_std().await; | ||||
|     let documents_count = tokio::task::spawn_blocking(move || { | ||||
|         let mut update_file = std::io::BufWriter::new(update_file); | ||||
|         let documents_count = match format { | ||||
|             PayloadType::Json => read_json(read_file, &mut update_file)?, | ||||
|             PayloadType::Csv { delimiter } => read_csv(read_file, &mut update_file, delimiter)?, | ||||
|             PayloadType::Ndjson => read_ndjson(read_file, &mut update_file)?, | ||||
|             PayloadType::Json => read_json(&read_file, &mut update_file)?, | ||||
|             PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, | ||||
|             PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, | ||||
|         }; | ||||
|         // we NEED to persist the file here because we moved the `udpate_file` in another task. | ||||
|         // TODO better support of errors | ||||
|         let update_file = match update_file.into_inner() { | ||||
|             Ok(update_file) => update_file, | ||||
|             Err(_) => todo!("handle errors"), | ||||
|         }; | ||||
|         update_file.persist()?; | ||||
|         Ok(documents_count) | ||||
|     }) | ||||
|   | ||||
| @@ -9,6 +9,7 @@ use heed::{RoTxn, RwTxn}; | ||||
| pub use partial_dump::PartialDump; | ||||
| use rayon::iter::{IntoParallelIterator, ParallelIterator}; | ||||
| use rayon::ThreadPool; | ||||
| pub use top_level_map::{CowStr, TopLevelMap}; | ||||
| pub use update_by_function::UpdateByFunction; | ||||
|  | ||||
| use super::channel::*; | ||||
|   | ||||
| @@ -1,4 +1,5 @@ | ||||
| pub use document_change::{Deletion, DocumentChange, Insertion, Update}; | ||||
| pub use indexer::{CowStr, TopLevelMap}; | ||||
| pub use items_pool::ItemsPool; | ||||
|  | ||||
| use super::del_add::DelAdd; | ||||
|   | ||||
		Reference in New Issue
	
	Block a user