Compare commits

..

4 Commits

Author SHA1 Message Date
Clément Renault
6210377b90 Add more logging to the received chunks 2024-01-25 20:16:59 +01:00
Clément Renault
9e3d1e1bbd Remove unused imports 2024-01-25 17:58:47 +01:00
Clément Renault
bceaf4f981 Add a log on the time taken by the incremental facet updating 2024-01-25 17:48:31 +01:00
Clément Renault
d29b301618 Disable the facet search 2024-01-25 17:47:33 +01:00
16 changed files with 154 additions and 419 deletions

30
Cargo.lock generated
View File

@@ -491,7 +491,7 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "benchmarks"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"anyhow",
"bytes",
@@ -1402,7 +1402,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"anyhow",
"big_s",
@@ -1634,7 +1634,7 @@ dependencies = [
[[package]]
name = "file-store"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"faux",
"tempfile",
@@ -1656,7 +1656,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"insta",
"nom",
@@ -1687,7 +1687,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"criterion",
"serde_json",
@@ -1805,7 +1805,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"arbitrary",
"clap",
@@ -2763,7 +2763,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"anyhow",
"big_s",
@@ -2960,7 +2960,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"criterion",
"serde_json",
@@ -3472,7 +3472,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"insta",
"md5",
@@ -3481,7 +3481,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"actix-cors",
"actix-http",
@@ -3572,7 +3572,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"base64 0.21.5",
"enum-iterator",
@@ -3591,7 +3591,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"actix-web",
"anyhow",
@@ -3621,7 +3621,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"anyhow",
"clap",
@@ -3669,7 +3669,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"arroy",
"big_s",
@@ -4076,7 +4076,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.6.1"
version = "1.6.0"
dependencies = [
"big_s",
"serde_json",

View File

@@ -19,7 +19,7 @@ members = [
]
[workspace.package]
version = "1.6.1"
version = "1.6.0"
authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"]
description = "Meilisearch HTTP server"
homepage = "https://meilisearch.com"

View File

@@ -154,5 +154,5 @@ greek = ["meilisearch-types/greek"]
khmer = ["meilisearch-types/khmer"]
[package.metadata.mini-dashboard]
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.13/build.zip"
sha1 = "e20cc9b390003c6c844f4b8bcc5c5013191a77ff"
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.12/build.zip"
sha1 = "acfe9a018c93eb0604ea87ee87bff7df5474e18e"

View File

@@ -64,7 +64,7 @@ impl Display for Value {
write!(
f,
"{}",
json_string!(self, { ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]", ".duration" => "[duration]" })
json_string!(self, { ".enqueuedAt" => "[date]", ".processedAt" => "[date]", ".finishedAt" => "[date]", ".duration" => "[duration]" })
)
}
}

View File

@@ -1760,181 +1760,6 @@ async fn add_documents_invalid_geo_field() {
"finishedAt": "[date]"
}
"###);
// The three next tests are related to #4333
// _geo has a lat and lng but set to `null`
let documents = json!([
{
"id": "12",
"_geo": { "lng": null, "lat": 67}
}
]);
let (response, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let response = index.wait_task(response.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"uid": 14,
"indexUid": "test",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 0
},
"error": {
"message": "Could not parse longitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
// _geo has a lat and lng but set to `null`
let documents = json!([
{
"id": "12",
"_geo": { "lng": 35, "lat": null }
}
]);
let (response, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let response = index.wait_task(response.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"uid": 15,
"indexUid": "test",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude in the document with the id: `12`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
// _geo has a lat and lng but set to `null`
let documents = json!([
{
"id": "13",
"_geo": { "lng": null, "lat": null }
}
]);
let (response, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let response = index.wait_task(response.uid()).await;
snapshot!(json_string!(response, { ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@r###"
{
"uid": 16,
"indexUid": "test",
"status": "failed",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 0
},
"error": {
"message": "Could not parse latitude nor longitude in the document with the id: `13`. Was expecting finite numbers but instead got `null` and `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
}
// Related to #4333
#[actix_rt::test]
async fn add_invalid_geo_and_then_settings() {
let server = Server::new().await;
let index = server.index("test");
index.create(Some("id")).await;
// _geo is not an object
let documents = json!([
{
"id": "11",
"_geo": { "lat": null, "lng": null },
}
]);
let (ret, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let ret = index.wait_task(ret.uid()).await;
snapshot!(ret, @r###"
{
"uid": 1,
"indexUid": "test",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"canceledBy": null,
"details": {
"receivedDocuments": 1,
"indexedDocuments": 1
},
"error": null,
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
let (ret, code) = index.update_settings(json!({"sortableAttributes": ["_geo"]})).await;
snapshot!(code, @"202 Accepted");
let ret = index.wait_task(ret.uid()).await;
snapshot!(ret, @r###"
{
"uid": 2,
"indexUid": "test",
"status": "failed",
"type": "settingsUpdate",
"canceledBy": null,
"details": {
"sortableAttributes": [
"_geo"
]
},
"error": {
"message": "Could not parse latitude in the document with the id: `\"11\"`. Was expecting a finite number but instead got `null`.",
"code": "invalid_document_geo_field",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_document_geo_field"
},
"duration": "[duration]",
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]"
}
"###);
}
#[actix_rt::test]

View File

@@ -87,52 +87,6 @@ async fn simple_search() {
snapshot!(response["hits"], @r###"[{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":[2.0,3.0]},"_semanticScore":0.99029034},{"title":"Captain Planet","desc":"He's not part of the Marvel Cinematic Universe","id":"2","_vectors":{"default":[1.0,2.0]},"_semanticScore":0.97434163},{"title":"Shazam!","desc":"a Captain Marvel ersatz","id":"1","_vectors":{"default":[1.0,3.0]},"_semanticScore":0.9472136}]"###);
}
#[actix_rt::test]
async fn highlighter() {
let server = Server::new().await;
let index = index_with_documents(&server, &SIMPLE_SEARCH_DOCUMENTS).await;
let (response, code) = index
.search_post(json!({"q": "Captain Marvel", "vector": [1.0, 1.0],
"hybrid": {"semanticRatio": 0.2},
"attributesToHighlight": [
"desc"
],
"highlightPreTag": "**BEGIN**",
"highlightPostTag": "**END**"
}))
.await;
snapshot!(code, @"200 OK");
snapshot!(response["hits"], @r###"[{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":[2.0,3.0]},"_formatted":{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":["2.0","3.0"]}}},{"title":"Shazam!","desc":"a Captain Marvel ersatz","id":"1","_vectors":{"default":[1.0,3.0]},"_formatted":{"title":"Shazam!","desc":"a **BEGIN**Captain**END** **BEGIN**Marvel**END** ersatz","id":"1","_vectors":{"default":["1.0","3.0"]}}},{"title":"Captain Planet","desc":"He's not part of the Marvel Cinematic Universe","id":"2","_vectors":{"default":[1.0,2.0]},"_formatted":{"title":"Captain Planet","desc":"He's not part of the **BEGIN**Marvel**END** Cinematic Universe","id":"2","_vectors":{"default":["1.0","2.0"]}}}]"###);
let (response, code) = index
.search_post(json!({"q": "Captain Marvel", "vector": [1.0, 1.0],
"hybrid": {"semanticRatio": 0.8},
"attributesToHighlight": [
"desc"
],
"highlightPreTag": "**BEGIN**",
"highlightPostTag": "**END**"
}))
.await;
snapshot!(code, @"200 OK");
snapshot!(response["hits"], @r###"[{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":[2.0,3.0]},"_formatted":{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":["2.0","3.0"]}},"_semanticScore":0.99029034},{"title":"Captain Planet","desc":"He's not part of the Marvel Cinematic Universe","id":"2","_vectors":{"default":[1.0,2.0]},"_formatted":{"title":"Captain Planet","desc":"He's not part of the **BEGIN**Marvel**END** Cinematic Universe","id":"2","_vectors":{"default":["1.0","2.0"]}},"_semanticScore":0.97434163},{"title":"Shazam!","desc":"a Captain Marvel ersatz","id":"1","_vectors":{"default":[1.0,3.0]},"_formatted":{"title":"Shazam!","desc":"a **BEGIN**Captain**END** **BEGIN**Marvel**END** ersatz","id":"1","_vectors":{"default":["1.0","3.0"]}},"_semanticScore":0.9472136}]"###);
// no highlighting on full semantic
let (response, code) = index
.search_post(json!({"q": "Captain Marvel", "vector": [1.0, 1.0],
"hybrid": {"semanticRatio": 1.0},
"attributesToHighlight": [
"desc"
],
"highlightPreTag": "**BEGIN**",
"highlightPostTag": "**END**"
}))
.await;
snapshot!(code, @"200 OK");
snapshot!(response["hits"], @r###"[{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":[2.0,3.0]},"_formatted":{"title":"Captain Marvel","desc":"a Shazam ersatz","id":"3","_vectors":{"default":["2.0","3.0"]}},"_semanticScore":0.99029034},{"title":"Captain Planet","desc":"He's not part of the Marvel Cinematic Universe","id":"2","_vectors":{"default":[1.0,2.0]},"_formatted":{"title":"Captain Planet","desc":"He's not part of the Marvel Cinematic Universe","id":"2","_vectors":{"default":["1.0","2.0"]}},"_semanticScore":0.97434163},{"title":"Shazam!","desc":"a Captain Marvel ersatz","id":"1","_vectors":{"default":[1.0,3.0]},"_formatted":{"title":"Shazam!","desc":"a Captain Marvel ersatz","id":"1","_vectors":{"default":["1.0","3.0"]}}}]"###);
}
#[actix_rt::test]
async fn invalid_semantic_ratio() {
let server = Server::new().await;

View File

@@ -102,7 +102,7 @@ impl ScoreWithRatioResult {
}
SearchResult {
matching_words: right.matching_words,
matching_words: left.matching_words,
candidates: left.candidates | right.candidates,
documents_ids,
document_scores,

View File

@@ -61,6 +61,7 @@ impl FacetsUpdateIncremental {
}
}
#[logging_timer::time("FacetsUpdateIncremental::{}")]
pub fn execute(self, wtxn: &mut RwTxn) -> crate::Result<()> {
let mut cursor = self.delta_data.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {

View File

@@ -76,26 +76,18 @@ pub const FACET_MAX_GROUP_SIZE: u8 = 8;
pub const FACET_GROUP_SIZE: u8 = 4;
pub const FACET_MIN_LEVEL_SIZE: u8 = 5;
use std::collections::BTreeSet;
use std::fs::File;
use std::io::BufReader;
use std::iter::FromIterator;
use charabia::normalizer::{Normalize, NormalizerOption};
use grenad::{CompressionType, SortAlgorithm};
use heed::types::{Bytes, DecodeIgnore, SerdeJson};
use heed::BytesEncode;
use log::debug;
use time::OffsetDateTime;
use self::incremental::FacetsUpdateIncremental;
use super::FacetsUpdateBulk;
use crate::facet::FacetType;
use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::facet::{FacetGroupKeyCodec, FacetGroupValueCodec};
use crate::heed_codec::BytesRefCodec;
use crate::update::index_documents::create_sorter;
use crate::update::merge_btreeset_string;
use crate::{BEU16StrCodec, Index, Result, MAX_FACET_VALUE_LENGTH};
use crate::{Index, Result};
pub mod bulk;
pub mod incremental;
@@ -170,91 +162,91 @@ impl<'i> FacetsUpdate<'i> {
incremental_update.execute(wtxn)?;
}
// We clear the list of normalized-for-search facets
// and the previous FSTs to compute everything from scratch
self.index.facet_id_normalized_string_strings.clear(wtxn)?;
self.index.facet_id_string_fst.clear(wtxn)?;
// // We clear the list of normalized-for-search facets
// // and the previous FSTs to compute everything from scratch
// self.index.facet_id_normalized_string_strings.clear(wtxn)?;
// self.index.facet_id_string_fst.clear(wtxn)?;
// As we can't use the same write transaction to read and write in two different databases
// we must create a temporary sorter that we will write into LMDB afterward.
// As multiple unnormalized facet values can become the same normalized facet value
// we must merge them together.
let mut sorter = create_sorter(
SortAlgorithm::Unstable,
merge_btreeset_string,
CompressionType::None,
None,
None,
None,
);
// // As we can't use the same write transaction to read and write in two different databases
// // we must create a temporary sorter that we will write into LMDB afterward.
// // As multiple unnormalized facet values can become the same normalized facet value
// // we must merge them together.
// let mut sorter = create_sorter(
// SortAlgorithm::Unstable,
// merge_btreeset_string,
// CompressionType::None,
// None,
// None,
// None,
// );
// We iterate on the list of original, semi-normalized, facet values
// and normalize them for search, inserting them in LMDB in any given order.
let options = NormalizerOption { lossy: true, ..Default::default() };
let database = self.index.facet_id_string_docids.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let (facet_group_key, ()) = result?;
if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key {
let mut normalized_facet = left_bound.normalize(&options);
let normalized_truncated_facet: String;
if normalized_facet.len() > MAX_FACET_VALUE_LENGTH {
normalized_truncated_facet = normalized_facet
.char_indices()
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
.map(|(_, c)| c)
.collect();
normalized_facet = normalized_truncated_facet.into();
}
let set = BTreeSet::from_iter(std::iter::once(left_bound));
let key = (field_id, normalized_facet.as_ref());
let key = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
sorter.insert(key, val)?;
}
}
// // We iterate on the list of original, semi-normalized, facet values
// // and normalize them for search, inserting them in LMDB in any given order.
// let options = NormalizerOption { lossy: true, ..Default::default() };
// let database = self.index.facet_id_string_docids.remap_data_type::<DecodeIgnore>();
// for result in database.iter(wtxn)? {
// let (facet_group_key, ()) = result?;
// if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key {
// let mut normalized_facet = left_bound.normalize(&options);
// let normalized_truncated_facet: String;
// if normalized_facet.len() > MAX_FACET_VALUE_LENGTH {
// normalized_truncated_facet = normalized_facet
// .char_indices()
// .take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
// .map(|(_, c)| c)
// .collect();
// normalized_facet = normalized_truncated_facet.into();
// }
// let set = BTreeSet::from_iter(std::iter::once(left_bound));
// let key = (field_id, normalized_facet.as_ref());
// let key = BEU16StrCodec::bytes_encode(&key).map_err(heed::Error::Encoding)?;
// let val = SerdeJson::bytes_encode(&set).map_err(heed::Error::Encoding)?;
// sorter.insert(key, val)?;
// }
// }
// In this loop we don't need to take care of merging bitmaps
// as the grenad sorter already merged them for us.
let mut merger_iter = sorter.into_stream_merger_iter()?;
while let Some((key_bytes, btreeset_bytes)) = merger_iter.next()? {
self.index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>().put(
wtxn,
key_bytes,
btreeset_bytes,
)?;
}
// // In this loop we don't need to take care of merging bitmaps
// // as the grenad sorter already merged them for us.
// let mut merger_iter = sorter.into_stream_merger_iter()?;
// while let Some((key_bytes, btreeset_bytes)) = merger_iter.next()? {
// self.index.facet_id_normalized_string_strings.remap_types::<Bytes, Bytes>().put(
// wtxn,
// key_bytes,
// btreeset_bytes,
// )?;
// }
// We compute one FST by string facet
let mut text_fsts = vec![];
let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
let database =
self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
for result in database.iter(wtxn)? {
let ((field_id, normalized_facet), _) = result?;
current_fst = match current_fst.take() {
Some((fid, fst_builder)) if fid != field_id => {
let fst = fst_builder.into_set();
text_fsts.push((fid, fst));
Some((field_id, fst::SetBuilder::memory()))
}
Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
None => Some((field_id, fst::SetBuilder::memory())),
};
// // We compute one FST by string facet
// let mut text_fsts = vec![];
// let mut current_fst: Option<(u16, fst::SetBuilder<Vec<u8>>)> = None;
// let database =
// self.index.facet_id_normalized_string_strings.remap_data_type::<DecodeIgnore>();
// for result in database.iter(wtxn)? {
// let ((field_id, normalized_facet), _) = result?;
// current_fst = match current_fst.take() {
// Some((fid, fst_builder)) if fid != field_id => {
// let fst = fst_builder.into_set();
// text_fsts.push((fid, fst));
// Some((field_id, fst::SetBuilder::memory()))
// }
// Some((field_id, fst_builder)) => Some((field_id, fst_builder)),
// None => Some((field_id, fst::SetBuilder::memory())),
// };
if let Some((_, fst_builder)) = current_fst.as_mut() {
fst_builder.insert(normalized_facet)?;
}
}
// if let Some((_, fst_builder)) = current_fst.as_mut() {
// fst_builder.insert(normalized_facet)?;
// }
// }
if let Some((field_id, fst_builder)) = current_fst {
let fst = fst_builder.into_set();
text_fsts.push((field_id, fst));
}
// if let Some((field_id, fst_builder)) = current_fst {
// let fst = fst_builder.into_set();
// text_fsts.push((field_id, fst));
// }
// We write those FSTs in LMDB now
for (field_id, fst) in text_fsts {
self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
}
// // We write those FSTs in LMDB now
// for (field_id, fst) in text_fsts {
// self.index.facet_id_string_fst.put(wtxn, &field_id, &fst)?;
// }
Ok(())
}

View File

@@ -34,9 +34,7 @@ pub fn extract_geo_points<R: io::Read + io::Seek>(
// since we only need the primary key when we throw an error
// we create this getter to lazily get it when needed
let document_id = || -> Value {
let reader = KvReaderDelAdd::new(obkv.get(primary_key_id).unwrap());
let document_id =
reader.get(DelAdd::Deletion).or(reader.get(DelAdd::Addition)).unwrap();
let document_id = obkv.get(primary_key_id).unwrap();
serde_json::from_slice(document_id).unwrap()
};

View File

@@ -339,7 +339,9 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
indexer: GrenadParameters,
embedder: Arc<Embedder>,
) -> Result<grenad::Reader<BufReader<File>>> {
let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism
let rt = tokio::runtime::Builder::new_current_thread().enable_io().enable_time().build()?;
let n_chunks = embedder.chunk_count_hint(); // chunk level parellelism
let n_vectors_per_chunk = embedder.prompt_count_in_chunk_hint(); // number of vectors in a single chunk
// docid, state with embedding
@@ -373,8 +375,11 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
current_chunk_ids.push(docid);
if chunks.len() == chunks.capacity() {
let chunked_embeds = embedder
.embed_chunks(std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)))
let chunked_embeds = rt
.block_on(
embedder
.embed_chunks(std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks))),
)
.map_err(crate::vector::Error::from)
.map_err(crate::Error::from)?;
@@ -391,8 +396,8 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
// send last chunk
if !chunks.is_empty() {
let chunked_embeds = embedder
.embed_chunks(std::mem::take(&mut chunks))
let chunked_embeds = rt
.block_on(embedder.embed_chunks(std::mem::take(&mut chunks)))
.map_err(crate::vector::Error::from)
.map_err(crate::Error::from)?;
for (docid, embeddings) in chunks_ids
@@ -405,15 +410,13 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
}
if !current_chunk.is_empty() {
let embeds = embedder
.embed_chunks(vec![std::mem::take(&mut current_chunk)])
let embeds = rt
.block_on(embedder.embed(std::mem::take(&mut current_chunk)))
.map_err(crate::vector::Error::from)
.map_err(crate::Error::from)?;
if let Some(embeds) = embeds.first() {
for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) {
state_writer.insert(docid.to_be_bytes(), cast_slice(embeddings.as_inner()))?;
}
for (docid, embeddings) in current_chunk_ids.iter().zip(embeds.iter()) {
state_writer.insert(docid.to_be_bytes(), cast_slice(embeddings.as_inner()))?;
}
}

View File

@@ -123,6 +123,8 @@ pub(crate) fn write_typed_chunk_into_index(
) -> Result<(RoaringBitmap, bool)> {
puffin::profile_function!(typed_chunk.to_debug_string());
log::debug!("Received a chunk to process: {}", typed_chunk.to_debug_string());
let mut is_merged_database = false;
match typed_chunk {
TypedChunk::Documents(obkv_documents_iter) => {

View File

@@ -67,10 +67,6 @@ pub enum EmbedErrorKind {
OpenAiUnhandledStatusCode(u16),
#[error("attempt to embed the following text in a configuration where embeddings must be user provided: {0:?}")]
ManualEmbed(String),
#[error("could not initialize asynchronous runtime: {0}")]
OpenAiRuntimeInit(std::io::Error),
#[error("initializing web client for sending embedding requests failed: {0}")]
InitWebClient(reqwest::Error),
}
impl EmbedError {
@@ -121,14 +117,6 @@ impl EmbedError {
pub(crate) fn embed_on_manual_embedder(texts: String) -> EmbedError {
Self { kind: EmbedErrorKind::ManualEmbed(texts), fault: FaultSource::User }
}
pub(crate) fn openai_runtime_init(inner: std::io::Error) -> EmbedError {
Self { kind: EmbedErrorKind::OpenAiRuntimeInit(inner), fault: FaultSource::Runtime }
}
pub fn openai_initialize_web_client(inner: reqwest::Error) -> Self {
Self { kind: EmbedErrorKind::InitWebClient(inner), fault: FaultSource::Runtime }
}
}
#[derive(Debug, thiserror::Error)]
@@ -195,6 +183,10 @@ impl NewEmbedderError {
}
}
pub fn openai_initialize_web_client(inner: reqwest::Error) -> Self {
Self { kind: NewEmbedderErrorKind::InitWebClient(inner), fault: FaultSource::Runtime }
}
pub fn openai_invalid_api_key_format(inner: reqwest::header::InvalidHeaderValue) -> Self {
Self { kind: NewEmbedderErrorKind::InvalidApiKeyFormat(inner), fault: FaultSource::User }
}
@@ -245,6 +237,8 @@ pub enum NewEmbedderErrorKind {
#[error("loading model failed: {0}")]
LoadModel(candle_core::Error),
// openai
#[error("initializing web client for sending embedding requests failed: {0}")]
InitWebClient(reqwest::Error),
#[error("The API key passed to Authorization error was in an invalid format: {0}")]
InvalidApiKeyFormat(reqwest::header::InvalidHeaderValue),
}

View File

@@ -145,8 +145,7 @@ impl Embedder {
let token_ids = tokens
.iter()
.map(|tokens| {
let mut tokens = tokens.get_ids().to_vec();
tokens.truncate(512);
let tokens = tokens.get_ids().to_vec();
Tensor::new(tokens.as_slice(), &self.model.device).map_err(EmbedError::tensor_shape)
})
.collect::<Result<Vec<_>, EmbedError>>()?;

View File

@@ -163,24 +163,18 @@ impl Embedder {
) -> std::result::Result<Vec<Embeddings<f32>>, EmbedError> {
match self {
Embedder::HuggingFace(embedder) => embedder.embed(texts),
Embedder::OpenAi(embedder) => {
let client = embedder.new_client()?;
embedder.embed(texts, &client).await
}
Embedder::OpenAi(embedder) => embedder.embed(texts).await,
Embedder::UserProvided(embedder) => embedder.embed(texts),
}
}
/// # Panics
///
/// - if called from an asynchronous context
pub fn embed_chunks(
pub async fn embed_chunks(
&self,
text_chunks: Vec<Vec<String>>,
) -> std::result::Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
match self {
Embedder::HuggingFace(embedder) => embedder.embed_chunks(text_chunks),
Embedder::OpenAi(embedder) => embedder.embed_chunks(text_chunks),
Embedder::OpenAi(embedder) => embedder.embed_chunks(text_chunks).await,
Embedder::UserProvided(embedder) => embedder.embed_chunks(text_chunks),
}
}

View File

@@ -8,7 +8,7 @@ use super::{DistributionShift, Embedding, Embeddings};
#[derive(Debug)]
pub struct Embedder {
headers: reqwest::header::HeaderMap,
client: reqwest::Client,
tokenizer: tiktoken_rs::CoreBPE,
options: EmbedderOptions,
}
@@ -95,13 +95,6 @@ impl EmbedderOptions {
}
impl Embedder {
pub fn new_client(&self) -> Result<reqwest::Client, EmbedError> {
reqwest::ClientBuilder::new()
.default_headers(self.headers.clone())
.build()
.map_err(EmbedError::openai_initialize_web_client)
}
pub fn new(options: EmbedderOptions) -> Result<Self, NewEmbedderError> {
let mut headers = reqwest::header::HeaderMap::new();
let mut inferred_api_key = Default::default();
@@ -118,25 +111,25 @@ impl Embedder {
reqwest::header::CONTENT_TYPE,
reqwest::header::HeaderValue::from_static("application/json"),
);
let client = reqwest::ClientBuilder::new()
.default_headers(headers)
.build()
.map_err(NewEmbedderError::openai_initialize_web_client)?;
// looking at the code it is very unclear that this can actually fail.
let tokenizer = tiktoken_rs::cl100k_base().unwrap();
Ok(Self { options, headers, tokenizer })
Ok(Self { options, client, tokenizer })
}
pub async fn embed(
&self,
texts: Vec<String>,
client: &reqwest::Client,
) -> Result<Vec<Embeddings<f32>>, EmbedError> {
pub async fn embed(&self, texts: Vec<String>) -> Result<Vec<Embeddings<f32>>, EmbedError> {
let mut tokenized = false;
for attempt in 0..7 {
let result = if tokenized {
self.try_embed_tokenized(&texts, client).await
self.try_embed_tokenized(&texts).await
} else {
self.try_embed(&texts, client).await
self.try_embed(&texts).await
};
let retry_duration = match result {
@@ -152,9 +145,9 @@ impl Embedder {
}
let result = if tokenized {
self.try_embed_tokenized(&texts, client).await
self.try_embed_tokenized(&texts).await
} else {
self.try_embed(&texts, client).await
self.try_embed(&texts).await
};
result.map_err(Retry::into_error)
@@ -232,13 +225,13 @@ impl Embedder {
async fn try_embed<S: AsRef<str> + serde::Serialize>(
&self,
texts: &[S],
client: &reqwest::Client,
) -> Result<Vec<Embeddings<f32>>, Retry> {
for text in texts {
log::trace!("Received prompt: {}", text.as_ref())
}
let request = OpenAiRequest { model: self.options.embedding_model.name(), input: texts };
let response = client
let response = self
.client
.post(OPENAI_EMBEDDINGS_URL)
.json(&request)
.send()
@@ -263,11 +256,7 @@ impl Embedder {
.collect())
}
async fn try_embed_tokenized(
&self,
text: &[String],
client: &reqwest::Client,
) -> Result<Vec<Embeddings<f32>>, Retry> {
async fn try_embed_tokenized(&self, text: &[String]) -> Result<Vec<Embeddings<f32>>, Retry> {
pub const OVERLAP_SIZE: usize = 200;
let mut all_embeddings = Vec::with_capacity(text.len());
for text in text {
@@ -275,7 +264,7 @@ impl Embedder {
let encoded = self.tokenizer.encode_ordinary(text.as_str());
let len = encoded.len();
if len < max_token_count {
all_embeddings.append(&mut self.try_embed(&[text], client).await?);
all_embeddings.append(&mut self.try_embed(&[text]).await?);
continue;
}
@@ -284,26 +273,22 @@ impl Embedder {
Embeddings::new(self.options.embedding_model.dimensions());
while tokens.len() > max_token_count {
let window = &tokens[..max_token_count];
embeddings_for_prompt.push(self.embed_tokens(window, client).await?).unwrap();
embeddings_for_prompt.push(self.embed_tokens(window).await?).unwrap();
tokens = &tokens[max_token_count - OVERLAP_SIZE..];
}
// end of text
embeddings_for_prompt.push(self.embed_tokens(tokens, client).await?).unwrap();
embeddings_for_prompt.push(self.embed_tokens(tokens).await?).unwrap();
all_embeddings.push(embeddings_for_prompt);
}
Ok(all_embeddings)
}
async fn embed_tokens(
&self,
tokens: &[usize],
client: &reqwest::Client,
) -> Result<Embedding, Retry> {
async fn embed_tokens(&self, tokens: &[usize]) -> Result<Embedding, Retry> {
for attempt in 0..9 {
let duration = match self.try_embed_tokens(tokens, client).await {
let duration = match self.try_embed_tokens(tokens).await {
Ok(embedding) => return Ok(embedding),
Err(retry) => retry.into_duration(attempt),
}
@@ -312,19 +297,14 @@ impl Embedder {
tokio::time::sleep(duration).await;
}
self.try_embed_tokens(tokens, client)
.await
.map_err(|retry| Retry::give_up(retry.into_error()))
self.try_embed_tokens(tokens).await.map_err(|retry| Retry::give_up(retry.into_error()))
}
async fn try_embed_tokens(
&self,
tokens: &[usize],
client: &reqwest::Client,
) -> Result<Embedding, Retry> {
async fn try_embed_tokens(&self, tokens: &[usize]) -> Result<Embedding, Retry> {
let request =
OpenAiTokensRequest { model: self.options.embedding_model.name(), input: tokens };
let response = client
let response = self
.client
.post(OPENAI_EMBEDDINGS_URL)
.json(&request)
.send()
@@ -342,19 +322,12 @@ impl Embedder {
Ok(response.data.pop().map(|data| data.embedding).unwrap_or_default())
}
pub fn embed_chunks(
pub async fn embed_chunks(
&self,
text_chunks: Vec<Vec<String>>,
) -> Result<Vec<Vec<Embeddings<f32>>>, EmbedError> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build()
.map_err(EmbedError::openai_runtime_init)?;
let client = self.new_client()?;
rt.block_on(futures::future::try_join_all(
text_chunks.into_iter().map(|prompts| self.embed(prompts, &client)),
))
futures::future::try_join_all(text_chunks.into_iter().map(|prompts| self.embed(prompts)))
.await
}
pub fn chunk_count_hint(&self) -> usize {