mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-22 14:21:03 +00:00
Compare commits
7 Commits
stream-dum
...
tmp-spawn-
Author | SHA1 | Date | |
---|---|---|---|
b3952e8b3d | |||
c668043c4f | |||
5a305bfdea | |||
f4dd73ec8c | |||
66dce4600d | |||
fe51ceca6d | |||
88174b8ae4 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@ -889,9 +889,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "charabia"
|
||||
version = "0.8.9"
|
||||
version = "0.8.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f6a65052f308636e5d5e1777f0dbc07919f5fbac24b6c8ad3e140472e5520de9"
|
||||
checksum = "933f20f2269b24d32fd5503e7b3c268af902190daf8d9d2b73ed2e75d77c00b4"
|
||||
dependencies = [
|
||||
"aho-corasick",
|
||||
"cow-utils",
|
||||
|
@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
mod error;
|
||||
mod new_writer;
|
||||
mod reader;
|
||||
mod writer;
|
||||
|
||||
|
@ -1,251 +0,0 @@
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek, Write};
|
||||
use std::path::Path;
|
||||
use std::result::Result as StdResult;
|
||||
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use meilisearch_types::milli::documents::{
|
||||
obkv_to_object, DocumentsBatchCursor, DocumentsBatchIndex, DocumentsBatchReader,
|
||||
};
|
||||
use tar::{Builder as TarBuilder, Header};
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{Key, Metadata, Result, TaskId, CURRENT_DUMP_VERSION};
|
||||
|
||||
pub struct DumpWriter<W: Write> {
|
||||
tar: TarBuilder<GzEncoder<W>>,
|
||||
}
|
||||
|
||||
impl<W: Write> DumpWriter<W> {
|
||||
pub fn new(instance_uuid: Option<Uuid>, writer: W) -> Result<Self> {
|
||||
/// TODO: should we use a BuffWriter?
|
||||
let gz_encoder = GzEncoder::new(writer, Compression::default());
|
||||
let mut tar = TarBuilder::new(gz_encoder);
|
||||
|
||||
let mut header = Header::new_gnu();
|
||||
|
||||
// Append metadata into metadata.json.
|
||||
let metadata = Metadata {
|
||||
dump_version: CURRENT_DUMP_VERSION,
|
||||
db_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
dump_date: OffsetDateTime::now_utc(),
|
||||
};
|
||||
let data = serde_json::to_string(&metadata).unwrap();
|
||||
header.set_size(data.len() as u64);
|
||||
tar.append_data(&mut header, "metadata.json", data.as_bytes()).unwrap();
|
||||
|
||||
// Append instance uid into instance_uid.uuid.
|
||||
if let Some(instance_uuid) = instance_uuid {
|
||||
let data = instance_uuid.as_hyphenated().to_string();
|
||||
header.set_size(data.len() as u64);
|
||||
tar.append_data(&mut header, "instance_uid.uuid", data.as_bytes()).unwrap();
|
||||
}
|
||||
|
||||
Ok(Self { tar })
|
||||
}
|
||||
|
||||
pub fn dump_keys(&mut self, keys: &[Key]) -> Result<()> {
|
||||
let mut buffer = Vec::new();
|
||||
for key in keys {
|
||||
serde_json::to_writer(&mut buffer, key)?;
|
||||
buffer.push(b'\n');
|
||||
}
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path("keys.jsonl");
|
||||
header.set_size(buffer.len() as u64);
|
||||
|
||||
self.tar.append(&mut header, buffer.as_slice())?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn create_tasks(&mut self) -> Result<FileWriter<W>> {
|
||||
FileWriter::new(&mut self.tar, "tasks/queue.jsonl")
|
||||
}
|
||||
|
||||
pub fn dump_update_file<R: Read + Seek>(
|
||||
&mut self,
|
||||
task_uid: TaskId,
|
||||
update_file: DocumentsBatchReader<R>,
|
||||
) -> Result<()> {
|
||||
let path = format!("tasks/update_files/{}.jsonl", task_uid);
|
||||
let mut fw = FileWriter::new(&mut self.tar, path)?;
|
||||
let mut serializer = UpdateFileSerializer::new(update_file);
|
||||
fw.calculate_len(SerializerIteratorReader::new(&mut serializer))?;
|
||||
serializer.reset();
|
||||
fw.write_data(SerializerIteratorReader::new(&mut serializer))
|
||||
}
|
||||
}
|
||||
|
||||
trait SerializerIterator {
|
||||
fn next_serialize_into(&mut self, buffer: &mut Vec<u8>) -> StdResult<bool, std::io::Error>;
|
||||
}
|
||||
|
||||
struct SerializerIteratorReader<'i, I: SerializerIterator> {
|
||||
iterator: &'i mut I,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<I: SerializerIterator> Read for SerializerIteratorReader<'_, I> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> StdResult<usize, std::io::Error> {
|
||||
let mut size = 0;
|
||||
loop {
|
||||
// if the inner buffer is empty, fill it with a new document.
|
||||
if self.buffer.is_empty() {
|
||||
if !self.iterator.next_serialize_into(&mut self.buffer)? {
|
||||
// nothing more to write, return the written size.
|
||||
return Ok(size);
|
||||
}
|
||||
}
|
||||
|
||||
let doc_size = self.buffer.len();
|
||||
let remaining_size = buf[size..].len();
|
||||
if remaining_size < doc_size {
|
||||
// if the serialized document size exceed the buf size,
|
||||
// drain the inner buffer filling the remaining space.
|
||||
buf[size..].copy_from_slice(&self.buffer[..remaining_size]);
|
||||
self.buffer.drain(..remaining_size);
|
||||
|
||||
// then return.
|
||||
return Ok(buf.len());
|
||||
} else {
|
||||
// otherwise write the whole inner buffer into the buf, clear it and continue.
|
||||
buf[size..][..doc_size].copy_from_slice(&self.buffer);
|
||||
size += doc_size;
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i, I: SerializerIterator> SerializerIteratorReader<'i, I> {
|
||||
fn new(iterator: &'i mut I) -> Self {
|
||||
Self { iterator, buffer: Vec::new() }
|
||||
}
|
||||
}
|
||||
|
||||
struct UpdateFileSerializer<R: Read> {
|
||||
cursor: DocumentsBatchCursor<R>,
|
||||
documents_batch_index: DocumentsBatchIndex,
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> SerializerIterator for UpdateFileSerializer<R> {
|
||||
fn next_serialize_into(&mut self, buffer: &mut Vec<u8>) -> StdResult<bool, std::io::Error> {
|
||||
/// TODO: don't unwrap, original version: `cursor.next_document().map_err(milli::Error::from)?`
|
||||
match self.cursor.next_document().unwrap() {
|
||||
Some(doc) => {
|
||||
/// TODO: don't unwrap
|
||||
let json_value = obkv_to_object(&doc, &self.documents_batch_index).unwrap();
|
||||
serde_json::to_writer(&mut *buffer, &json_value)?;
|
||||
buffer.push(b'\n');
|
||||
Ok(true)
|
||||
}
|
||||
None => Ok(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read + Seek> UpdateFileSerializer<R> {
|
||||
fn new(reader: DocumentsBatchReader<R>) -> Self {
|
||||
let (cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
|
||||
|
||||
Self { cursor, documents_batch_index }
|
||||
}
|
||||
|
||||
/// Resets the cursor to be able to read from the start again.
|
||||
pub fn reset(&mut self) {
|
||||
self.cursor.reset();
|
||||
}
|
||||
}
|
||||
|
||||
pub struct FileWriter<'a, W: Write> {
|
||||
header: Header,
|
||||
tar: &'a mut TarBuilder<GzEncoder<W>>,
|
||||
size: Option<u64>,
|
||||
}
|
||||
|
||||
impl<'a, W: Write> FileWriter<'a, W> {
|
||||
pub(crate) fn new<P: AsRef<Path>>(
|
||||
tar: &'a mut TarBuilder<GzEncoder<W>>,
|
||||
path: P,
|
||||
) -> Result<Self> {
|
||||
let mut header = Header::new_gnu();
|
||||
header.set_path(path);
|
||||
Ok(Self { header, tar, size: None })
|
||||
}
|
||||
|
||||
pub fn calculate_len<R: Read>(&mut self, mut reader: R) -> Result<u64> {
|
||||
let mut calculator = SizeCalculatorWriter::new();
|
||||
std::io::copy(&mut reader, &mut calculator)?;
|
||||
let size = calculator.into_inner();
|
||||
self.size = Some(size);
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
|
||||
pub fn write_data<R: Read>(mut self, reader: R) -> Result<()> {
|
||||
let expected_size =
|
||||
self.size.expect("calculate_len must be called before writing the data.");
|
||||
self.header.set_size(expected_size);
|
||||
|
||||
let mut scr = SizeCalculatorReader::new(reader);
|
||||
self.tar.append(&mut self.header, &mut scr)?;
|
||||
assert_eq!(
|
||||
expected_size,
|
||||
scr.into_inner(),
|
||||
"Provided data size is different from the pre-calculated size."
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct SizeCalculatorWriter {
|
||||
size: usize,
|
||||
}
|
||||
|
||||
impl SizeCalculatorWriter {
|
||||
fn new() -> Self {
|
||||
Self { size: 0 }
|
||||
}
|
||||
|
||||
fn into_inner(self) -> u64 {
|
||||
self.size as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for SizeCalculatorWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> StdResult<usize, std::io::Error> {
|
||||
self.size += buf.len();
|
||||
Ok(self.size)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
struct SizeCalculatorReader<R: Read> {
|
||||
size: usize,
|
||||
reader: R,
|
||||
}
|
||||
|
||||
impl<R: Read> SizeCalculatorReader<R> {
|
||||
fn new(reader: R) -> Self {
|
||||
Self { size: 0, reader }
|
||||
}
|
||||
|
||||
fn into_inner(self) -> u64 {
|
||||
self.size as u64
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: Read> Read for SizeCalculatorReader<R> {
|
||||
fn read(&mut self, buf: &mut [u8]) -> StdResult<usize, std::io::Error> {
|
||||
let size = self.reader.read(buf)?;
|
||||
self.size += size;
|
||||
|
||||
Ok(size)
|
||||
}
|
||||
}
|
@ -57,3 +57,5 @@ greek = ["milli/greek"]
|
||||
khmer = ["milli/khmer"]
|
||||
# allow vietnamese specialized tokenization
|
||||
vietnamese = ["milli/vietnamese"]
|
||||
# force swedish character recomposition
|
||||
swedish-recomposition = ["milli/swedish-recomposition"]
|
||||
|
@ -156,6 +156,7 @@ thai = ["meilisearch-types/thai"]
|
||||
greek = ["meilisearch-types/greek"]
|
||||
khmer = ["meilisearch-types/khmer"]
|
||||
vietnamese = ["meilisearch-types/vietnamese"]
|
||||
swedish-recomposition = ["meilisearch-types/swedish-recomposition"]
|
||||
|
||||
[package.metadata.mini-dashboard]
|
||||
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.13/build.zip"
|
||||
|
@ -367,12 +367,6 @@ async fn get_version(
|
||||
})
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct KeysResponse {
|
||||
private: Option<String>,
|
||||
public: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn get_health(
|
||||
index_scheduler: Data<IndexScheduler>,
|
||||
auth_controller: Data<AuthController>,
|
||||
|
@ -17,7 +17,7 @@ bincode = "1.3.3"
|
||||
bstr = "1.9.0"
|
||||
bytemuck = { version = "1.14.0", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.5.0"
|
||||
charabia = { version = "0.8.9", default-features = false }
|
||||
charabia = { version = "0.8.10", default-features = false }
|
||||
concat-arrays = "0.1.2"
|
||||
crossbeam-channel = "0.5.11"
|
||||
deserr = "0.6.1"
|
||||
@ -136,7 +136,11 @@ greek = ["charabia/greek"]
|
||||
# allow khmer specialized tokenization
|
||||
khmer = ["charabia/khmer"]
|
||||
|
||||
# allow vietnamese specialized tokenization
|
||||
vietnamese = ["charabia/vietnamese"]
|
||||
|
||||
# force swedish character recomposition
|
||||
swedish-recomposition = ["charabia/swedish-recomposition"]
|
||||
|
||||
# allow CUDA support, see <https://github.com/meilisearch/meilisearch/issues/4306>
|
||||
cuda = ["candle-core/cuda"]
|
||||
|
@ -229,12 +229,15 @@ fn send_original_documents_data(
|
||||
let documents_chunk_cloned = original_documents_chunk.clone();
|
||||
let lmdb_writer_sx_cloned = lmdb_writer_sx.clone();
|
||||
|
||||
let request_threads = ThreadPoolNoAbortBuilder::new()
|
||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||
.thread_name(|index| format!("embedding-request-{index}"))
|
||||
.build()?;
|
||||
let new_embedding_configs = settings_diff.new.embedding_configs.clone();
|
||||
|
||||
if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() {
|
||||
if (settings_diff.reindex_vectors() || !settings_diff.settings_update_only())
|
||||
&& new_embedding_configs.get_default().is_some()
|
||||
{
|
||||
let request_threads = ThreadPoolNoAbortBuilder::new()
|
||||
.num_threads(crate::vector::REQUEST_PARALLELISM)
|
||||
.thread_name(|index| format!("embedding-request-{index}"))
|
||||
.build()?;
|
||||
let settings_diff = settings_diff.clone();
|
||||
rayon::spawn(move || {
|
||||
for (name, (embedder, prompt)) in settings_diff.new.embedding_configs.clone() {
|
||||
|
@ -301,10 +301,14 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
fn from(value: EmbeddingConfig) -> Self {
|
||||
let EmbeddingConfig { embedder_options, prompt } = value;
|
||||
match embedder_options {
|
||||
super::EmbedderOptions::HuggingFace(options) => Self {
|
||||
super::EmbedderOptions::HuggingFace(super::hf::EmbedderOptions {
|
||||
model,
|
||||
revision,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::HuggingFace),
|
||||
model: Setting::Set(options.model),
|
||||
revision: options.revision.map(Setting::Set).unwrap_or_default(),
|
||||
model: Setting::Set(model),
|
||||
revision: revision.map(Setting::Set).unwrap_or_default(),
|
||||
api_key: Setting::NotSet,
|
||||
dimensions: Setting::NotSet,
|
||||
document_template: Setting::Set(prompt.template),
|
||||
@ -314,14 +318,19 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::OpenAi(options) => Self {
|
||||
super::EmbedderOptions::OpenAi(super::openai::EmbedderOptions {
|
||||
api_key,
|
||||
embedding_model,
|
||||
dimensions,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::OpenAi),
|
||||
model: Setting::Set(options.embedding_model.name().to_owned()),
|
||||
model: Setting::Set(embedding_model.name().to_owned()),
|
||||
revision: Setting::NotSet,
|
||||
api_key: options.api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: options.dimensions.map(Setting::Set).unwrap_or_default(),
|
||||
api_key: api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: dimensions.map(Setting::Set).unwrap_or_default(),
|
||||
document_template: Setting::Set(prompt.template),
|
||||
url: Setting::NotSet,
|
||||
query: Setting::NotSet,
|
||||
@ -329,29 +338,37 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::Ollama(options) => Self {
|
||||
super::EmbedderOptions::Ollama(super::ollama::EmbedderOptions {
|
||||
embedding_model,
|
||||
url,
|
||||
api_key,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::Ollama),
|
||||
model: Setting::Set(options.embedding_model.to_owned()),
|
||||
model: Setting::Set(embedding_model),
|
||||
revision: Setting::NotSet,
|
||||
api_key: options.api_key.map(Setting::Set).unwrap_or_default(),
|
||||
api_key: api_key.map(Setting::Set).unwrap_or_default(),
|
||||
dimensions: Setting::NotSet,
|
||||
document_template: Setting::Set(prompt.template),
|
||||
url: Setting::NotSet,
|
||||
url: url.map(Setting::Set).unwrap_or_default(),
|
||||
query: Setting::NotSet,
|
||||
input_field: Setting::NotSet,
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::UserProvided(options) => Self {
|
||||
super::EmbedderOptions::UserProvided(super::manual::EmbedderOptions {
|
||||
dimensions,
|
||||
distribution,
|
||||
}) => Self {
|
||||
source: Setting::Set(EmbedderSource::UserProvided),
|
||||
model: Setting::NotSet,
|
||||
revision: Setting::NotSet,
|
||||
api_key: Setting::NotSet,
|
||||
dimensions: Setting::Set(options.dimensions),
|
||||
dimensions: Setting::Set(dimensions),
|
||||
document_template: Setting::NotSet,
|
||||
url: Setting::NotSet,
|
||||
query: Setting::NotSet,
|
||||
@ -359,7 +376,7 @@ impl From<EmbeddingConfig> for EmbeddingSettings {
|
||||
path_to_embeddings: Setting::NotSet,
|
||||
embedding_object: Setting::NotSet,
|
||||
input_type: Setting::NotSet,
|
||||
distribution: options.distribution.map(Setting::Set).unwrap_or_default(),
|
||||
distribution: distribution.map(Setting::Set).unwrap_or_default(),
|
||||
},
|
||||
super::EmbedderOptions::Rest(super::rest::EmbedderOptions {
|
||||
api_key,
|
||||
|
Reference in New Issue
Block a user