Compare commits

..

7 Commits

Author SHA1 Message Date
b3952e8b3d Only spawn request threads if necessary 2024-06-18 15:34:39 +02:00
c668043c4f Merge #4617
4617: Destructure `EmbedderOptions` so we don't miss some options r=dureuill a=dureuill

# Pull Request

## Related issue
#4595 was caused by the code not destructuring the embedder options.


## What does this PR do?
This PR adds the missing `url` parameter for ollama, and makes sure similar issue cannot happen in the future



Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2024-05-02 14:55:32 +00:00
5a305bfdea Remove unused struct 2024-05-02 16:14:37 +02:00
f4dd73ec8c Destructure EmbedderOptions so we don't miss some options 2024-05-02 15:39:36 +02:00
66dce4600d Merge #4603
4603: Update charabia v0.8.10 r=Kerollmops a=ManyTheFish

- Update Charabia v0.8.10
- Add `swedish-recomposition` as an optional feature flag

Co-authored-by: ManyTheFish <many@meilisearch.com>
2024-04-30 13:04:02 +00:00
fe51ceca6d Update lock file 2024-04-30 14:33:37 +02:00
88174b8ae4 Update charabia v0.8.10 2024-04-30 14:30:23 +02:00
9 changed files with 52 additions and 283 deletions

4
Cargo.lock generated
View File

@ -889,9 +889,9 @@ dependencies = [
[[package]]
name = "charabia"
version = "0.8.9"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6a65052f308636e5d5e1777f0dbc07919f5fbac24b6c8ad3e140472e5520de9"
checksum = "933f20f2269b24d32fd5503e7b3c268af902190daf8d9d2b73ed2e75d77c00b4"
dependencies = [
"aho-corasick",
"cow-utils",

View File

@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
mod error;
mod new_writer;
mod reader;
mod writer;

View File

@ -1,251 +0,0 @@
use std::fs::File;
use std::io::{Read, Seek, Write};
use std::path::Path;
use std::result::Result as StdResult;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::milli::documents::{
obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, DocumentsBatchReader,
};
use tar::{Builder as TarBuilder, Header};
use time::OffsetDateTime;
use uuid::Uuid;
use crate::{Key, Metadata, Result, TaskId, CURRENT_DUMP_VERSION};
pub struct DumpWriter<W: Write> {
tar: TarBuilder<GzEncoder<W>>,
}
impl<W: Write> DumpWriter<W> {
pub fn new(instance_uuid: Option<Uuid>, writer: W) -> Result<Self> {
/// TODO: should we use a BuffWriter?
let gz_encoder = GzEncoder::new(writer, Compression::default());
let mut tar = TarBuilder::new(gz_encoder);
let mut header = Header::new_gnu();
// Append metadata into metadata.json.
let metadata = Metadata {
dump_version: CURRENT_DUMP_VERSION,
db_version: env!("CARGO_PKG_VERSION").to_string(),
dump_date: OffsetDateTime::now_utc(),
};
let data = serde_json::to_string(&metadata).unwrap();
header.set_size(data.len() as u64);
tar.append_data(&mut header, "metadata.json", data.as_bytes()).unwrap();
// Append instance uid into instance_uid.uuid.
if let Some(instance_uuid) = instance_uuid {
let data = instance_uuid.as_hyphenated().to_string();
header.set_size(data.len() as u64);
tar.append_data(&mut header, "instance_uid.uuid", data.as_bytes()).unwrap();
}
Ok(Self { tar })
}
pub fn dump_keys(&mut self, keys: &[Key]) -> Result<()> {
let mut buffer = Vec::new();
for key in keys {
serde_json::to_writer(&mut buffer, key)?;
buffer.push(b'\n');
}
let mut header = Header::new_gnu();
header.set_path("keys.jsonl");
header.set_size(buffer.len() as u64);
self.tar.append(&mut header, buffer.as_slice())?;
Ok(())
}
pub fn create_tasks(&mut self) -> Result<FileWriter<W>> {
FileWriter::new(&mut self.tar, "tasks/queue.jsonl")
}
pub fn dump_update_file<R: Read + Seek>(
&mut self,
task_uid: TaskId,
update_file: DocumentsBatchReader<R>,
) -> Result<()> {
let path = format!("tasks/update_files/{}.jsonl", task_uid);
let mut fw = FileWriter::new(&mut self.tar, path)?;
let mut serializer = UpdateFileSerializer::new(update_file);
fw.calculate_len(SerializerIteratorReader::new(&mut serializer))?;
serializer.reset();
fw.write_data(SerializerIteratorReader::new(&mut serializer))
}
}
trait SerializerIterator {
fn next_serialize_into(&mut self, buffer: &mut Vec<u8>) -> StdResult<bool, std::io::Error>;
}
struct SerializerIteratorReader<'i, I: SerializerIterator> {
iterator: &'i mut I,
buffer: Vec<u8>,
}
impl<I: SerializerIterator> Read for SerializerIteratorReader<'_, I> {
fn read(&mut self, buf: &mut [u8]) -> StdResult<usize, std::io::Error> {
let mut size = 0;
loop {
// if the inner buffer is empty, fill it with a new document.
if self.buffer.is_empty() {
if !self.iterator.next_serialize_into(&mut self.buffer)? {
// nothing more to write, return the written size.
return Ok(size);
}
}
let doc_size = self.buffer.len();
let remaining_size = buf[size..].len();
if remaining_size < doc_size {
// if the serialized document size exceed the buf size,
// drain the inner buffer filling the remaining space.
buf[size..].copy_from_slice(&self.buffer[..remaining_size]);
self.buffer.drain(..remaining_size);
// then return.
return Ok(buf.len());
} else {
// otherwise write the whole inner buffer into the buf, clear it and continue.
buf[size..][..doc_size].copy_from_slice(&self.buffer);
size += doc_size;
self.buffer.clear();
}
}
}
}
impl<'i, I: SerializerIterator> SerializerIteratorReader<'i, I> {
fn new(iterator: &'i mut I) -> Self {
Self { iterator, buffer: Vec::new() }
}
}
struct UpdateFileSerializer<R: Read> {
cursor: DocumentsBatchCursor<R>,
documents_batch_index: DocumentsBatchIndex,
}
impl<R: Read + Seek> SerializerIterator for UpdateFileSerializer<R> {
fn next_serialize_into(&mut self, buffer: &mut Vec<u8>) -> StdResult<bool, std::io::Error> {
/// TODO: don't unwrap, original version: `cursor.next_document().map_err(milli::Error::from)?`
match self.cursor.next_document().unwrap() {
Some(doc) => {
/// TODO: don't unwrap
let json_value = obkv_to_object(&doc, &self.documents_batch_index).unwrap();
serde_json::to_writer(&mut *buffer, &json_value)?;
buffer.push(b'\n');
Ok(true)
}
None => Ok(false),
}
}
}
impl<R: Read + Seek> UpdateFileSerializer<R> {
fn new(reader: DocumentsBatchReader<R>) -> Self {
let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
Self { cursor, documents_batch_index }
}
/// Resets the cursor to be able to read from the start again.
pub fn reset(&mut self) {
self.cursor.reset();
}
}
pub struct FileWriter<'a, W: Write> {
header: Header,
tar: &'a mut TarBuilder<GzEncoder<W>>,
size: Option<u64>,
}
impl<'a, W: Write> FileWriter<'a, W> {
pub(crate) fn new<P: AsRef<Path>>(
tar: &'a mut TarBuilder<GzEncoder<W>>,
path: P,
) -> Result<Self> {
let mut header = Header::new_gnu();
header.set_path(path);
Ok(Self { header, tar, size: None })
}
pub fn calculate_len<R: Read>(&mut self, mut reader: R) -> Result<u64> {
let mut calculator = SizeCalculatorWriter::new();
std::io::copy(&mut reader, &mut calculator)?;
let size = calculator.into_inner();
self.size = Some(size);
Ok(size)
}
pub fn write_data<R: Read>(mut self, reader: R) -> Result<()> {
let expected_size =
self.size.expect("calculate_len must be called before writing the data.");
self.header.set_size(expected_size);
let mut scr = SizeCalculatorReader::new(reader);
self.tar.append(&mut self.header, &mut scr)?;
assert_eq!(
expected_size,
scr.into_inner(),
"Provided data size is different from the pre-calculated size."
);
Ok(())
}
}
struct SizeCalculatorWriter {
size: usize,
}
impl SizeCalculatorWriter {
fn new() -> Self {
Self { size: 0 }
}
fn into_inner(self) -> u64 {
self.size as u64
}
}
impl Write for SizeCalculatorWriter {
fn write(&mut self, buf: &[u8]) -> StdResult<usize, std::io::Error> {
self.size += buf.len();
Ok(self.size)
}
fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
Ok(())
}
}
struct SizeCalculatorReader<R: Read> {
size: usize,
reader: R,
}
impl<R: Read> SizeCalculatorReader<R> {
fn new(reader: R) -> Self {
Self { size: 0, reader }
}
fn into_inner(self) -> u64 {
self.size as u64
}
}
impl<R: Read> Read for SizeCalculatorReader<R> {
fn read(&mut self, buf: &mut [u8]) -> StdResult<usize, std::io::Error> {
let size = self.reader.read(buf)?;
self.size += size;
Ok(size)
}
}

View File

@ -57,3 +57,5 @@ greek = ["milli/greek"]
khmer = ["milli/khmer"]
# allow vietnamese specialized tokenization
vietnamese = ["milli/vietnamese"]
# force swedish character recomposition
swedish-recomposition = ["milli/swedish-recomposition"]

View File

@ -156,6 +156,7 @@ thai = ["meilisearch-types/thai"]
greek = ["meilisearch-types/greek"]
khmer = ["meilisearch-types/khmer"]
vietnamese = ["meilisearch-types/vietnamese"]
swedish-recomposition = ["meilisearch-types/swedish-recomposition"]
[package.metadata.mini-dashboard]
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.13/build.zip"

View File

@ -367,12 +367,6 @@ async fn get_version(
})
}
#[derive(Serialize)]
struct KeysResponse {
private: Option<String>,
public: Option<String>,
}
pub async fn get_health(
index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>,

View File

@ -17,7 +17,7 @@ bincode = "1.3.3"
bstr = "1.9.0"
bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] }
byteorder = "1.5.0"
charabia = { version = "0.8.9", default-features = false }
charabia = { version = "0.8.10", default-features = false }
concat-arrays = "0.1.2"
crossbeam-channel = "0.5.11"
deserr = "0.6.1"
@ -136,7 +136,11 @@ greek = ["charabia/greek"]
# allow khmer specialized tokenization
khmer = ["charabia/khmer"]
# allow vietnamese specialized tokenization
vietnamese = ["charabia/vietnamese"]
# force swedish character recomposition
swedish-recomposition = ["charabia/swedish-recomposition"]
# allow CUDA support, see <https://github.com/meilisearch/meilisearch/issues/4306>
cuda = ["candle-core/cuda"]

View File

@ -229,12 +229,15 @@ fn send_original_documents_data(
let documents_chunk_cloned = original_documents_chunk.clone();
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
let request_threads = ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
let new_embedding_configs = settings_diff.new.embedding_configs.clone();
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
if (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
&& new_embedding_configs.get_default().is_some()
{
let request_threads = ThreadPoolNoAbortBuilder::new()
.num_threads(crate::vector::REQUEST_PARALLELISM)
.thread_name(|index| format!("embedding-request-{index}"))
.build()?;
let settings_diff = settings_diff.clone();
rayon::spawn(move || {
for (name, (embedder, prompt)) in settings_diff.new.embedding_configs.clone() {

View File

@ -301,10 +301,14 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
fn from(value: EmbeddingConfig) -> Self {
let EmbeddingConfig { embedder_options, prompt } = value;
match embedder_options {
super::EmbedderOptions::HuggingFace(options) => Self {
super::EmbedderOptions::HuggingFace(super::hf::EmbedderOptions {
model,
revision,
distribution,
}) => Self {
source: Setting::Set(EmbedderSource::HuggingFace),
model: Setting::Set(options.model),
revision: options.revision.map(Setting::Set).unwrap_or_default(),
model: Setting::Set(model),
revision: revision.map(Setting::Set).unwrap_or_default(),
api_key: Setting::NotSet,
dimensions: Setting::NotSet,
document_template: Setting::Set(prompt.template),
@ -314,14 +318,19 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
path_to_embeddings: Setting::NotSet,
embedding_object: Setting::NotSet,
input_type: Setting::NotSet,
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
distribution: distribution.map(Setting::Set).unwrap_or_default(),
},
super::EmbedderOptions::OpenAi(options) => Self {
super::EmbedderOptions::OpenAi(super::openai::EmbedderOptions {
api_key,
embedding_model,
dimensions,
distribution,
}) => Self {
source: Setting::Set(EmbedderSource::OpenAi),
model: Setting::Set(options.embedding_model.name().to_owned()),
model: Setting::Set(embedding_model.name().to_owned()),
revision: Setting::NotSet,
api_key: options.api_key.map(Setting::Set).unwrap_or_default(),
dimensions: options.dimensions.map(Setting::Set).unwrap_or_default(),
api_key: api_key.map(Setting::Set).unwrap_or_default(),
dimensions: dimensions.map(Setting::Set).unwrap_or_default(),
document_template: Setting::Set(prompt.template),
url: Setting::NotSet,
query: Setting::NotSet,
@ -329,29 +338,37 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
path_to_embeddings: Setting::NotSet,
embedding_object: Setting::NotSet,
input_type: Setting::NotSet,
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
distribution: distribution.map(Setting::Set).unwrap_or_default(),
},
super::EmbedderOptions::Ollama(options) => Self {
super::EmbedderOptions::Ollama(super::ollama::EmbedderOptions {
embedding_model,
url,
api_key,
distribution,
}) => Self {
source: Setting::Set(EmbedderSource::Ollama),
model: Setting::Set(options.embedding_model.to_owned()),
model: Setting::Set(embedding_model),
revision: Setting::NotSet,
api_key: options.api_key.map(Setting::Set).unwrap_or_default(),
api_key: api_key.map(Setting::Set).unwrap_or_default(),
dimensions: Setting::NotSet,
document_template: Setting::Set(prompt.template),
url: Setting::NotSet,
url: url.map(Setting::Set).unwrap_or_default(),
query: Setting::NotSet,
input_field: Setting::NotSet,
path_to_embeddings: Setting::NotSet,
embedding_object: Setting::NotSet,
input_type: Setting::NotSet,
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
distribution: distribution.map(Setting::Set).unwrap_or_default(),
},
super::EmbedderOptions::UserProvided(options) => Self {
super::EmbedderOptions::UserProvided(super::manual::EmbedderOptions {
dimensions,
distribution,
}) => Self {
source: Setting::Set(EmbedderSource::UserProvided),
model: Setting::NotSet,
revision: Setting::NotSet,
api_key: Setting::NotSet,
dimensions: Setting::Set(options.dimensions),
dimensions: Setting::Set(dimensions),
document_template: Setting::NotSet,
url: Setting::NotSet,
query: Setting::NotSet,
@ -359,7 +376,7 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
path_to_embeddings: Setting::NotSet,
embedding_object: Setting::NotSet,
input_type: Setting::NotSet,
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
distribution: distribution.map(Setting::Set).unwrap_or_default(),
},
super::EmbedderOptions::Rest(super::rest::EmbedderOptions {
api_key,