mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-10-23 20:16:29 +00:00
Merge #4644
4644: Revert "Stream documents" and keep heed+arroy to the latest verion r=Kerollmops a=irevoire Reverts meilisearch/meilisearch#4544 Fixes https://github.com/meilisearch/meilisearch/issues/4641 I didn’t realize that some http clients were not handling chunked http requests like you would expect (if you ask the body, it gives you the body), which made the previous PR breaking. There is no way to provide a good fix to the issue we initially wanted to fix without breaking meilisearch and that’s not planned for now. Co-authored-by: Tamo <irevoire@protonmail.ch> Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -3346,7 +3346,6 @@ dependencies = [
|
|||||||
"rayon",
|
"rayon",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"roaring",
|
|
||||||
"rustls 0.21.12",
|
"rustls 0.21.12",
|
||||||
"rustls-pemfile",
|
"rustls-pemfile",
|
||||||
"segment",
|
"segment",
|
||||||
@@ -4415,6 +4414,12 @@ dependencies = [
|
|||||||
"winreg",
|
"winreg",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "retain_mut"
|
||||||
|
version = "0.1.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ring"
|
name = "ring"
|
||||||
version = "0.17.8"
|
version = "0.17.8"
|
||||||
@@ -4432,12 +4437,13 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "roaring"
|
name = "roaring"
|
||||||
version = "0.10.3"
|
version = "0.10.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf"
|
checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bytemuck",
|
"bytemuck",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
"retain_mut",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@@ -108,7 +108,6 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
|
|||||||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
||||||
tracing-actix-web = "0.7.9"
|
tracing-actix-web = "0.7.9"
|
||||||
build-info = { version = "1.7.0", path = "../build-info" }
|
build-info = { version = "1.7.0", path = "../build-info" }
|
||||||
roaring = "0.10.3"
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.9.0"
|
actix-rt = "2.9.0"
|
||||||
|
@@ -1,14 +1,12 @@
|
|||||||
use std::io::{ErrorKind, Write};
|
use std::io::ErrorKind;
|
||||||
|
|
||||||
use actix_web::http::header::CONTENT_TYPE;
|
use actix_web::http::header::CONTENT_TYPE;
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
||||||
use bstr::ByteSlice as _;
|
use bstr::ByteSlice as _;
|
||||||
use bytes::Bytes;
|
|
||||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||||
use deserr::Deserr;
|
use deserr::Deserr;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use futures_util::Stream;
|
|
||||||
use index_scheduler::{IndexScheduler, TaskId};
|
use index_scheduler::{IndexScheduler, TaskId};
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||||
@@ -24,9 +22,7 @@ use meilisearch_types::tasks::KindWithContent;
|
|||||||
use meilisearch_types::{milli, Document, Index};
|
use meilisearch_types::{milli, Document, Index};
|
||||||
use mime::Mime;
|
use mime::Mime;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use roaring::RoaringBitmap;
|
use serde::Deserialize;
|
||||||
use serde::ser::SerializeSeq;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
use tempfile::tempfile;
|
use tempfile::tempfile;
|
||||||
use tokio::fs::File;
|
use tokio::fs::File;
|
||||||
@@ -234,34 +230,6 @@ pub async fn get_documents(
|
|||||||
documents_by_query(&index_scheduler, index_uid, query)
|
documents_by_query(&index_scheduler, index_uid, query)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Writer2Streamer {
|
|
||||||
sender: tokio::sync::mpsc::Sender<Result<Bytes, anyhow::Error>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Write for Writer2Streamer {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
||||||
self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?;
|
|
||||||
Ok(buf.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> std::io::Result<()> {
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn stream(
|
|
||||||
data: impl Serialize + Send + 'static,
|
|
||||||
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
|
|
||||||
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);
|
|
||||||
|
|
||||||
tokio::task::spawn_blocking(move || {
|
|
||||||
serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data)
|
|
||||||
});
|
|
||||||
futures_util::stream::unfold(receiver, |mut receiver| async {
|
|
||||||
receiver.recv().await.map(|value| (value, receiver))
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn documents_by_query(
|
fn documents_by_query(
|
||||||
index_scheduler: &IndexScheduler,
|
index_scheduler: &IndexScheduler,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
@@ -271,13 +239,12 @@ fn documents_by_query(
|
|||||||
let BrowseQuery { offset, limit, fields, filter } = query;
|
let BrowseQuery { offset, limit, fields, filter } = query;
|
||||||
|
|
||||||
let index = index_scheduler.index(&index_uid)?;
|
let index = index_scheduler.index(&index_uid)?;
|
||||||
let documents = retrieve_documents(index, offset, limit, filter, fields)?;
|
let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?;
|
||||||
|
|
||||||
let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents);
|
let ret = PaginationView::new(offset, limit, total as usize, documents);
|
||||||
|
|
||||||
debug!(returns = ?ret, "Get documents");
|
debug!(returns = ?ret, "Get documents");
|
||||||
|
Ok(HttpResponse::Ok().json(ret))
|
||||||
Ok(HttpResponse::Ok().streaming(stream(ret)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Deserr)]
|
#[derive(Deserialize, Debug, Deserr)]
|
||||||
@@ -623,47 +590,14 @@ fn some_documents<'a, 't: 'a>(
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct DocumentsStreamer {
|
fn retrieve_documents<S: AsRef<str>>(
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
index: &Index,
|
||||||
documents: RoaringBitmap,
|
|
||||||
rtxn: RoTxn<'static>,
|
|
||||||
index: Index,
|
|
||||||
pub total_documents: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Serialize for DocumentsStreamer {
|
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: serde::Serializer,
|
|
||||||
{
|
|
||||||
let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();
|
|
||||||
|
|
||||||
let documents = some_documents(&self.index, &self.rtxn, self.documents.iter()).unwrap();
|
|
||||||
for document in documents {
|
|
||||||
let document = document.unwrap();
|
|
||||||
let document = match self.attributes_to_retrieve {
|
|
||||||
Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values(
|
|
||||||
&document,
|
|
||||||
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
|
||||||
),
|
|
||||||
None => document,
|
|
||||||
};
|
|
||||||
|
|
||||||
seq.serialize_element(&document)?;
|
|
||||||
}
|
|
||||||
seq.end()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn retrieve_documents(
|
|
||||||
index: Index,
|
|
||||||
offset: usize,
|
offset: usize,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
filter: Option<Value>,
|
filter: Option<Value>,
|
||||||
attributes_to_retrieve: Option<Vec<String>>,
|
attributes_to_retrieve: Option<Vec<S>>,
|
||||||
) -> Result<DocumentsStreamer, ResponseError> {
|
) -> Result<(u64, Vec<Document>), ResponseError> {
|
||||||
let rtxn = index.static_read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
|
|
||||||
let filter = &filter;
|
let filter = &filter;
|
||||||
let filter = if let Some(filter) = filter {
|
let filter = if let Some(filter) = filter {
|
||||||
parse_filter(filter)
|
parse_filter(filter)
|
||||||
@@ -673,7 +607,7 @@ fn retrieve_documents(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let candidates = if let Some(filter) = filter {
|
let candidates = if let Some(filter) = filter {
|
||||||
filter.evaluate(&rtxn, &index).map_err(|err| match err {
|
filter.evaluate(&rtxn, index).map_err(|err| match err {
|
||||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||||
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
|
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
|
||||||
}
|
}
|
||||||
@@ -683,13 +617,27 @@ fn retrieve_documents(
|
|||||||
index.documents_ids(&rtxn)?
|
index.documents_ids(&rtxn)?
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(DocumentsStreamer {
|
let (it, number_of_documents) = {
|
||||||
total_documents: candidates.len(),
|
let number_of_documents = candidates.len();
|
||||||
attributes_to_retrieve,
|
(
|
||||||
documents: candidates.into_iter().skip(offset).take(limit).collect(),
|
some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?,
|
||||||
rtxn,
|
number_of_documents,
|
||||||
index,
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
let documents: Result<Vec<_>, ResponseError> = it
|
||||||
|
.map(|document| {
|
||||||
|
Ok(match &attributes_to_retrieve {
|
||||||
|
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
|
||||||
|
&document?,
|
||||||
|
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
||||||
|
),
|
||||||
|
None => document?,
|
||||||
})
|
})
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Ok((number_of_documents, documents?))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn retrieve_document<S: AsRef<str>>(
|
fn retrieve_document<S: AsRef<str>>(
|
||||||
|
@@ -1,5 +1,4 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
@@ -125,31 +124,20 @@ pub struct Pagination {
|
|||||||
pub limit: usize,
|
pub limit: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct PaginationView<T: Serialize> {
|
pub struct PaginationView<T> {
|
||||||
pub results: T,
|
pub results: Vec<T>,
|
||||||
pub offset: usize,
|
pub offset: usize,
|
||||||
pub limit: usize,
|
pub limit: usize,
|
||||||
pub total: usize,
|
pub total: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Serialize> fmt::Debug for PaginationView<T> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("PaginationView")
|
|
||||||
.field("offset", &self.offset)
|
|
||||||
.field("limit", &self.limit)
|
|
||||||
.field("total", &self.total)
|
|
||||||
.field("results", &"[...]")
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Pagination {
|
impl Pagination {
|
||||||
/// Given the full data to paginate, returns the selected section.
|
/// Given the full data to paginate, returns the selected section.
|
||||||
pub fn auto_paginate_sized<T>(
|
pub fn auto_paginate_sized<T>(
|
||||||
self,
|
self,
|
||||||
content: impl IntoIterator<Item = T> + ExactSizeIterator,
|
content: impl IntoIterator<Item = T> + ExactSizeIterator,
|
||||||
) -> PaginationView<Vec<T>>
|
) -> PaginationView<T>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@@ -163,7 +151,7 @@ impl Pagination {
|
|||||||
self,
|
self,
|
||||||
total: usize,
|
total: usize,
|
||||||
content: impl IntoIterator<Item = T>,
|
content: impl IntoIterator<Item = T>,
|
||||||
) -> PaginationView<Vec<T>>
|
) -> PaginationView<T>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@@ -173,7 +161,7 @@ impl Pagination {
|
|||||||
|
|
||||||
/// Given the data already paginated + the total number of elements, it stores
|
/// Given the data already paginated + the total number of elements, it stores
|
||||||
/// everything in a [PaginationResult].
|
/// everything in a [PaginationResult].
|
||||||
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<Vec<T>>
|
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<T>
|
||||||
where
|
where
|
||||||
T: Serialize,
|
T: Serialize,
|
||||||
{
|
{
|
||||||
@@ -181,8 +169,8 @@ impl Pagination {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Serialize> PaginationView<T> {
|
impl<T> PaginationView<T> {
|
||||||
pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self {
|
pub fn new(offset: usize, limit: usize, total: usize, results: Vec<T>) -> Self {
|
||||||
Self { offset, limit, results, total }
|
Self { offset, limit, results, total }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user