mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-07-19 04:50:37 +00:00
Compare commits
49 Commits
prototype-
...
prototype-
Author | SHA1 | Date | |
---|---|---|---|
6243422ff4 | |||
3b69233906 | |||
e9b62aacb3 | |||
456960d2c7 | |||
3dda176723 | |||
af0f6f0bf0 | |||
ccf3ba3f32 | |||
65528a3e06 | |||
cdb4b3e024 | |||
8c0ebd1331 | |||
5130e06b41 | |||
08e27ef73f | |||
914b125c5f | |||
717b069907 | |||
7ea154673a | |||
b947f3bb9d | |||
4c35817c5f | |||
c53841e166 | |||
fd81945597 | |||
794e491152 | |||
cab27c2ab4 | |||
624fa9052f | |||
359ede4862 | |||
60c11dbdbd | |||
dacee40ebc | |||
6089083a8e | |||
cc2c19d4c3 | |||
a5c56fac8a | |||
e4e49e63d0 | |||
00bd7bd19a | |||
ef3d098b4d | |||
8084cf29f3 | |||
5a7c1bde84 | |||
6b2d671be7 | |||
43c13faeda | |||
29adfc2f68 | |||
064ee95b1c | |||
604d533b31 | |||
44c1900f36 | |||
04671d0751 | |||
4f4c669d50 | |||
35758db9ec | |||
4988199bb9 | |||
83991ee770 | |||
9d061cec26 | |||
fe819a9d80 | |||
e338ceb97f | |||
75c87d5391 | |||
dd57873f8e |
20
.github/workflows/test-suite.yml
vendored
20
.github/workflows/test-suite.yml
vendored
@ -37,13 +37,13 @@ jobs:
|
||||
toolchain: stable
|
||||
override: true
|
||||
- name: Setup test with Rust nightly
|
||||
if: github.event_name == 'schedule'
|
||||
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
|
||||
uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: nightly
|
||||
override: true
|
||||
- name: Cache dependencies
|
||||
uses: Swatinem/rust-cache@v2.5.1
|
||||
uses: Swatinem/rust-cache@v2.6.2
|
||||
- name: Run cargo check without any default features
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@ -65,7 +65,7 @@ jobs:
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Cache dependencies
|
||||
uses: Swatinem/rust-cache@v2.5.1
|
||||
uses: Swatinem/rust-cache@v2.6.2
|
||||
- name: Run cargo check without any default features
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@ -78,12 +78,12 @@ jobs:
|
||||
args: --locked --release --all
|
||||
|
||||
test-all-features:
|
||||
name: Tests all features on cron schedule only
|
||||
name: Tests all features
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations
|
||||
image: ubuntu:18.04
|
||||
if: github.event_name == 'schedule'
|
||||
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install needed dependencies
|
||||
@ -110,7 +110,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
container:
|
||||
image: ubuntu:18.04
|
||||
if: github.event_name == 'schedule'
|
||||
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Install needed dependencies
|
||||
@ -146,7 +146,7 @@ jobs:
|
||||
toolchain: stable
|
||||
override: true
|
||||
- name: Cache dependencies
|
||||
uses: Swatinem/rust-cache@v2.5.1
|
||||
uses: Swatinem/rust-cache@v2.6.2
|
||||
- name: Run tests in debug
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@ -161,11 +161,11 @@ jobs:
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
profile: minimal
|
||||
toolchain: 1.69.0
|
||||
toolchain: 1.71.1
|
||||
override: true
|
||||
components: clippy
|
||||
- name: Cache dependencies
|
||||
uses: Swatinem/rust-cache@v2.5.1
|
||||
uses: Swatinem/rust-cache@v2.6.2
|
||||
- name: Run cargo clippy
|
||||
uses: actions-rs/cargo@v1
|
||||
with:
|
||||
@ -184,7 +184,7 @@ jobs:
|
||||
override: true
|
||||
components: rustfmt
|
||||
- name: Cache dependencies
|
||||
uses: Swatinem/rust-cache@v2.5.1
|
||||
uses: Swatinem/rust-cache@v2.6.2
|
||||
- name: Run cargo fmt
|
||||
# Since we never ran the `build.rs` script in the benchmark directory we are missing one auto-generated import file.
|
||||
# Since we want to trigger (and fail) this action as fast as possible, instead of building the benchmark crate
|
||||
|
662
Cargo.lock
generated
662
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -18,7 +18,7 @@ members = [
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.3.0"
|
||||
version = "1.4.0"
|
||||
authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"]
|
||||
description = "Meilisearch HTTP server"
|
||||
homepage = "https://meilisearch.com"
|
||||
|
@ -7,7 +7,7 @@ use meilisearch_types::milli::update::IndexDocumentsMethod;
|
||||
use meilisearch_types::settings::Unchecked;
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task, TaskId};
|
||||
use meilisearch_types::InstanceUid;
|
||||
use roaring::RoaringBitmap;
|
||||
use roaring::RoaringTreemap;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
@ -121,11 +121,11 @@ pub enum KindDump {
|
||||
},
|
||||
TaskCancelation {
|
||||
query: String,
|
||||
tasks: RoaringBitmap,
|
||||
tasks: RoaringTreemap,
|
||||
},
|
||||
TasksDeletion {
|
||||
query: String,
|
||||
tasks: RoaringBitmap,
|
||||
tasks: RoaringTreemap,
|
||||
},
|
||||
DumpCreation {
|
||||
keys: Vec<Key>,
|
||||
|
@ -69,7 +69,7 @@ impl CompatV5ToV6 {
|
||||
}
|
||||
|
||||
let task = v6::Task {
|
||||
uid: task_view.uid,
|
||||
uid: task_view.uid as u64,
|
||||
index_uid: task_view.index_uid,
|
||||
status: match task_view.status {
|
||||
v5::Status::Enqueued => v6::Status::Enqueued,
|
||||
|
@ -0,0 +1,24 @@
|
||||
---
|
||||
source: dump/src/reader/mod.rs
|
||||
expression: spells.settings().unwrap()
|
||||
---
|
||||
{
|
||||
"displayedAttributes": [
|
||||
"*"
|
||||
],
|
||||
"searchableAttributes": [
|
||||
"*"
|
||||
],
|
||||
"filterableAttributes": [],
|
||||
"sortableAttributes": [],
|
||||
"rankingRules": [
|
||||
"typo",
|
||||
"words",
|
||||
"proximity",
|
||||
"attribute",
|
||||
"exactness"
|
||||
],
|
||||
"stopWords": [],
|
||||
"synonyms": {},
|
||||
"distinctAttribute": null
|
||||
}
|
@ -0,0 +1,38 @@
|
||||
---
|
||||
source: dump/src/reader/mod.rs
|
||||
expression: products.settings().unwrap()
|
||||
---
|
||||
{
|
||||
"displayedAttributes": [
|
||||
"*"
|
||||
],
|
||||
"searchableAttributes": [
|
||||
"*"
|
||||
],
|
||||
"filterableAttributes": [],
|
||||
"sortableAttributes": [],
|
||||
"rankingRules": [
|
||||
"typo",
|
||||
"words",
|
||||
"proximity",
|
||||
"attribute",
|
||||
"exactness"
|
||||
],
|
||||
"stopWords": [],
|
||||
"synonyms": {
|
||||
"android": [
|
||||
"phone",
|
||||
"smartphone"
|
||||
],
|
||||
"iphone": [
|
||||
"phone",
|
||||
"smartphone"
|
||||
],
|
||||
"phone": [
|
||||
"android",
|
||||
"iphone",
|
||||
"smartphone"
|
||||
]
|
||||
},
|
||||
"distinctAttribute": null
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
---
|
||||
source: dump/src/reader/mod.rs
|
||||
expression: movies.settings().unwrap()
|
||||
---
|
||||
{
|
||||
"displayedAttributes": [
|
||||
"*"
|
||||
],
|
||||
"searchableAttributes": [
|
||||
"*"
|
||||
],
|
||||
"filterableAttributes": [
|
||||
"genres",
|
||||
"id"
|
||||
],
|
||||
"sortableAttributes": [
|
||||
"genres",
|
||||
"id"
|
||||
],
|
||||
"rankingRules": [
|
||||
"typo",
|
||||
"words",
|
||||
"proximity",
|
||||
"attribute",
|
||||
"exactness",
|
||||
"release_date:asc"
|
||||
],
|
||||
"stopWords": [],
|
||||
"synonyms": {},
|
||||
"distinctAttribute": null
|
||||
}
|
@ -13,7 +13,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
arbitrary = { version = "1.3.0", features = ["derive"] }
|
||||
clap = { version = "4.3.0", features = ["derive"] }
|
||||
fastrand = "1.9.0"
|
||||
fastrand = "2.0.0"
|
||||
milli = { path = "../milli" }
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
serde_json = { version = "1.0.95", features = ["preserve_order"] }
|
||||
|
@ -32,11 +32,11 @@ use meilisearch_types::milli::update::{
|
||||
DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
|
||||
Settings as MilliSettings,
|
||||
};
|
||||
use meilisearch_types::milli::{self, Filter, BEU32};
|
||||
use meilisearch_types::milli::{self, Filter, BEU64};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
|
||||
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
|
||||
use roaring::RoaringBitmap;
|
||||
use roaring::RoaringTreemap;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
use uuid::Uuid;
|
||||
@ -58,7 +58,7 @@ pub(crate) enum Batch {
|
||||
/// The date and time at which the previously processing tasks started.
|
||||
previous_started_at: OffsetDateTime,
|
||||
/// The list of tasks that were processing when this task cancelation appeared.
|
||||
previous_processing_tasks: RoaringBitmap,
|
||||
previous_processing_tasks: RoaringTreemap,
|
||||
},
|
||||
TaskDeletion(Task),
|
||||
SnapshotCreation(Vec<Task>),
|
||||
@ -1065,7 +1065,13 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
/// Swap the index `lhs` with the index `rhs`.
|
||||
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
|
||||
fn apply_index_swap(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
task_id: TaskId,
|
||||
lhs: &str,
|
||||
rhs: &str,
|
||||
) -> Result<()> {
|
||||
// 1. Verify that both lhs and rhs are existing indexes
|
||||
let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?;
|
||||
if !index_lhs_exists {
|
||||
@ -1086,7 +1092,7 @@ impl IndexScheduler {
|
||||
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
|
||||
let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
swap_index_uid_in_task(&mut task, (lhs, rhs));
|
||||
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
|
||||
self.all_tasks.put(wtxn, &BEU64::new(task_id), &task)?;
|
||||
}
|
||||
|
||||
// 4. remove the task from indexuid = before_name
|
||||
@ -1389,7 +1395,11 @@ impl IndexScheduler {
|
||||
/// Delete each given task from all the databases (if it is deleteable).
|
||||
///
|
||||
/// Return the number of tasks that were actually deleted.
|
||||
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
|
||||
fn delete_matched_tasks(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
matched_tasks: &RoaringTreemap,
|
||||
) -> Result<u64> {
|
||||
// 1. Remove from this list the tasks that we are not allowed to delete
|
||||
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
|
||||
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
|
||||
@ -1404,7 +1414,7 @@ impl IndexScheduler {
|
||||
let mut affected_indexes = HashSet::new();
|
||||
let mut affected_statuses = HashSet::new();
|
||||
let mut affected_kinds = HashSet::new();
|
||||
let mut affected_canceled_by = RoaringBitmap::new();
|
||||
let mut affected_canceled_by = RoaringTreemap::new();
|
||||
|
||||
for task_id in to_delete_tasks.iter() {
|
||||
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
@ -1441,10 +1451,10 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
for task in to_delete_tasks.iter() {
|
||||
self.all_tasks.delete(wtxn, &BEU32::new(task))?;
|
||||
self.all_tasks.delete(wtxn, &BEU64::new(task))?;
|
||||
}
|
||||
for canceled_by in affected_canceled_by {
|
||||
let canceled_by = BEU32::new(canceled_by);
|
||||
let canceled_by = BEU64::new(canceled_by);
|
||||
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
|
||||
tasks -= &to_delete_tasks;
|
||||
if tasks.is_empty() {
|
||||
@ -1465,9 +1475,9 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
cancel_task_id: TaskId,
|
||||
matched_tasks: &RoaringBitmap,
|
||||
matched_tasks: &RoaringTreemap,
|
||||
previous_started_at: OffsetDateTime,
|
||||
previous_processing_tasks: &RoaringBitmap,
|
||||
previous_processing_tasks: &RoaringTreemap,
|
||||
) -> Result<Vec<Uuid>> {
|
||||
let now = OffsetDateTime::now_utc();
|
||||
|
||||
@ -1492,7 +1502,7 @@ impl IndexScheduler {
|
||||
task.details = task.details.map(|d| d.to_failed());
|
||||
self.update_task(wtxn, &task)?;
|
||||
}
|
||||
self.canceled_by.put(wtxn, &BEU32::new(cancel_task_id), &tasks_to_cancel)?;
|
||||
self.canceled_by.put(wtxn, &BEU64::new(cancel_task_id), &tasks_to_cancel)?;
|
||||
|
||||
Ok(content_files_to_delete)
|
||||
}
|
||||
|
@ -48,6 +48,8 @@ impl From<DateField> for Code {
|
||||
pub enum Error {
|
||||
#[error("{1}")]
|
||||
WithCustomErrorCode(Code, Box<Self>),
|
||||
#[error("Received bad task id: {received} should be >= to {expected}.")]
|
||||
BadTaskId { received: TaskId, expected: TaskId },
|
||||
#[error("Index `{0}` not found.")]
|
||||
IndexNotFound(String),
|
||||
#[error("Index `{0}` already exists.")]
|
||||
@ -159,6 +161,7 @@ impl Error {
|
||||
match self {
|
||||
Error::IndexNotFound(_)
|
||||
| Error::WithCustomErrorCode(_, _)
|
||||
| Error::BadTaskId { .. }
|
||||
| Error::IndexAlreadyExists(_)
|
||||
| Error::SwapDuplicateIndexFound(_)
|
||||
| Error::SwapDuplicateIndexesFound(_)
|
||||
@ -202,6 +205,7 @@ impl ErrorCode for Error {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
Error::WithCustomErrorCode(code, _) => *code,
|
||||
Error::BadTaskId { .. } => Code::BadRequest,
|
||||
Error::IndexNotFound(_) => Code::IndexNotFound,
|
||||
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
||||
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,
|
||||
|
@ -3,9 +3,10 @@ use std::fmt::Write;
|
||||
|
||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, RoTxn};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::milli::heed_codec::{CboRoaringTreemapCodec, RoaringTreemapCodec};
|
||||
use meilisearch_types::milli::BEU64;
|
||||
use meilisearch_types::tasks::{Details, Task};
|
||||
use roaring::RoaringBitmap;
|
||||
use roaring::RoaringTreemap;
|
||||
|
||||
use crate::index_mapper::IndexMapper;
|
||||
use crate::{IndexScheduler, Kind, Status, BEI128};
|
||||
@ -47,7 +48,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
||||
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
|
||||
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
|
||||
snap.push_str("### Processing Tasks:\n");
|
||||
snap.push_str(&snapshot_bitmap(&processing_tasks));
|
||||
snap.push_str(&snapshot_treemap(&processing_tasks));
|
||||
snap.push_str("\n----------------------------------------------------------------------\n");
|
||||
|
||||
snap.push_str("### All Tasks:\n");
|
||||
@ -103,7 +104,7 @@ pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
|
||||
snap
|
||||
}
|
||||
|
||||
pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
|
||||
pub fn snapshot_treemap(r: &RoaringTreemap) -> String {
|
||||
let mut snap = String::new();
|
||||
snap.push('[');
|
||||
for x in r {
|
||||
@ -113,7 +114,7 @@ pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
|
||||
snap
|
||||
}
|
||||
|
||||
pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Task>>) -> String {
|
||||
pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU64>, SerdeJson<Task>>) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
@ -125,13 +126,13 @@ pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson
|
||||
|
||||
pub fn snapshot_date_db(
|
||||
rtxn: &RoTxn,
|
||||
db: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
db: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
|
||||
) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (_timestamp, task_ids) = next.unwrap();
|
||||
snap.push_str(&format!("[timestamp] {}\n", snapshot_bitmap(&task_ids)));
|
||||
snap.push_str(&format!("[timestamp] {}\n", snapshot_treemap(&task_ids)));
|
||||
}
|
||||
snap
|
||||
}
|
||||
@ -216,45 +217,48 @@ fn snapshot_details(d: &Details) -> String {
|
||||
|
||||
pub fn snapshot_status(
|
||||
rtxn: &RoTxn,
|
||||
db: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
|
||||
db: Database<SerdeBincode<Status>, RoaringTreemapCodec>,
|
||||
) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (status, task_ids) = next.unwrap();
|
||||
writeln!(snap, "{status} {}", snapshot_bitmap(&task_ids)).unwrap();
|
||||
writeln!(snap, "{status} {}", snapshot_treemap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
pub fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (kind, task_ids) = next.unwrap();
|
||||
let kind = serde_json::to_string(&kind).unwrap();
|
||||
writeln!(snap, "{kind} {}", snapshot_bitmap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
|
||||
pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (index, task_ids) = next.unwrap();
|
||||
writeln!(snap, "{index} {}", snapshot_bitmap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
pub fn snapshot_canceled_by(
|
||||
pub fn snapshot_kind(
|
||||
rtxn: &RoTxn,
|
||||
db: Database<OwnedType<BEU32>, RoaringBitmapCodec>,
|
||||
db: Database<SerdeBincode<Kind>, RoaringTreemapCodec>,
|
||||
) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (kind, task_ids) = next.unwrap();
|
||||
writeln!(snap, "{kind} {}", snapshot_bitmap(&task_ids)).unwrap();
|
||||
let kind = serde_json::to_string(&kind).unwrap();
|
||||
writeln!(snap, "{kind} {}", snapshot_treemap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
|
||||
pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringTreemapCodec>) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (index, task_ids) = next.unwrap();
|
||||
writeln!(snap, "{index} {}", snapshot_treemap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
pub fn snapshot_canceled_by(
|
||||
rtxn: &RoTxn,
|
||||
db: Database<OwnedType<BEU64>, RoaringTreemapCodec>,
|
||||
) -> String {
|
||||
let mut snap = String::new();
|
||||
let iter = db.iter(rtxn).unwrap();
|
||||
for next in iter {
|
||||
let (kind, task_ids) = next.unwrap();
|
||||
writeln!(snap, "{kind} {}", snapshot_treemap(&task_ids)).unwrap();
|
||||
}
|
||||
snap
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,]
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [1,2,3,]
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(2), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(2), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [1,2,3,]
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,]
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,]
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: canceled, canceled_by: 3, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
|
||||
2 {uid: 2, status: canceled, canceled_by: 3, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued []
|
||||
|
@ -7,8 +7,8 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [1,]
|
||||
|
@ -8,7 +8,7 @@ source: index-scheduler/src/lib.rs
|
||||
### All Tasks:
|
||||
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { matched_tasks: 1, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
2 {uid: 2, status: enqueued, details: { matched_tasks: 1, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [1,2,]
|
||||
|
@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
|
||||
----------------------------------------------------------------------
|
||||
### All Tasks:
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
|
||||
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [1,]
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
|
||||
3 {uid: 3, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,2,]
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,2,3,]
|
||||
|
@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
|
||||
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
|
||||
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
|
||||
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
|
||||
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
|
||||
----------------------------------------------------------------------
|
||||
### Status:
|
||||
enqueued [0,1,2,3,]
|
||||
|
@ -41,6 +41,18 @@ source: index-scheduler/src/lib.rs
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
|
@ -21,6 +21,18 @@ source: index-scheduler/src/lib.rs
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
|
@ -106,6 +106,18 @@ source: index-scheduler/src/lib.rs
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
|
@ -61,6 +61,18 @@ source: index-scheduler/src/lib.rs
|
||||
"taskDeletion": {
|
||||
"query": "[query]",
|
||||
"tasks": [
|
||||
1,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
0,
|
||||
58,
|
||||
48,
|
||||
0,
|
||||
|
@ -5,15 +5,16 @@ use std::ops::Bound;
|
||||
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, OwnedType};
|
||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::milli::heed_codec::CboRoaringTreemapCodec;
|
||||
use meilisearch_types::milli::BEU64;
|
||||
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status};
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
use roaring::{MultiOps, RoaringTreemap};
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
|
||||
|
||||
impl IndexScheduler {
|
||||
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
|
||||
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringTreemap> {
|
||||
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
|
||||
}
|
||||
|
||||
@ -26,7 +27,7 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result<Option<Task>> {
|
||||
Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?)
|
||||
Ok(self.all_tasks.get(rtxn, &BEU64::new(task_id))?)
|
||||
}
|
||||
|
||||
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
|
||||
@ -88,12 +89,12 @@ impl IndexScheduler {
|
||||
}
|
||||
}
|
||||
|
||||
self.all_tasks.put(wtxn, &BEU32::new(task.uid), task)?;
|
||||
self.all_tasks.put(wtxn, &BEU64::new(task.uid), task)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns the whole set of tasks that belongs to this index.
|
||||
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
|
||||
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringTreemap> {
|
||||
Ok(self.index_tasks.get(rtxn, index)?.unwrap_or_default())
|
||||
}
|
||||
|
||||
@ -101,7 +102,7 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
index: &str,
|
||||
f: impl Fn(&mut RoaringBitmap),
|
||||
f: impl Fn(&mut RoaringTreemap),
|
||||
) -> Result<()> {
|
||||
let mut tasks = self.index_tasks(wtxn, index)?;
|
||||
f(&mut tasks);
|
||||
@ -114,7 +115,7 @@ impl IndexScheduler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
|
||||
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringTreemap> {
|
||||
Ok(self.status.get(rtxn, &status)?.unwrap_or_default())
|
||||
}
|
||||
|
||||
@ -122,7 +123,7 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
status: Status,
|
||||
bitmap: &RoaringBitmap,
|
||||
bitmap: &RoaringTreemap,
|
||||
) -> Result<()> {
|
||||
Ok(self.status.put(wtxn, &status, bitmap)?)
|
||||
}
|
||||
@ -131,7 +132,7 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
status: Status,
|
||||
f: impl Fn(&mut RoaringBitmap),
|
||||
f: impl Fn(&mut RoaringTreemap),
|
||||
) -> Result<()> {
|
||||
let mut tasks = self.get_status(wtxn, status)?;
|
||||
f(&mut tasks);
|
||||
@ -140,7 +141,7 @@ impl IndexScheduler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
|
||||
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringTreemap> {
|
||||
Ok(self.kind.get(rtxn, &kind)?.unwrap_or_default())
|
||||
}
|
||||
|
||||
@ -148,7 +149,7 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
kind: Kind,
|
||||
bitmap: &RoaringBitmap,
|
||||
bitmap: &RoaringTreemap,
|
||||
) -> Result<()> {
|
||||
Ok(self.kind.put(wtxn, &kind, bitmap)?)
|
||||
}
|
||||
@ -157,7 +158,7 @@ impl IndexScheduler {
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
kind: Kind,
|
||||
f: impl Fn(&mut RoaringBitmap),
|
||||
f: impl Fn(&mut RoaringTreemap),
|
||||
) -> Result<()> {
|
||||
let mut tasks = self.get_kind(wtxn, kind)?;
|
||||
f(&mut tasks);
|
||||
@ -169,20 +170,20 @@ impl IndexScheduler {
|
||||
|
||||
pub(crate) fn insert_task_datetime(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
|
||||
time: OffsetDateTime,
|
||||
task_id: TaskId,
|
||||
) -> Result<()> {
|
||||
let timestamp = BEI128::new(time.unix_timestamp_nanos());
|
||||
let mut task_ids = database.get(wtxn, ×tamp)?.unwrap_or_default();
|
||||
task_ids.insert(task_id);
|
||||
database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(task_ids))?;
|
||||
database.put(wtxn, ×tamp, &RoaringTreemap::from_iter(task_ids))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn remove_task_datetime(
|
||||
wtxn: &mut RwTxn,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
|
||||
time: OffsetDateTime,
|
||||
task_id: TaskId,
|
||||
) -> Result<()> {
|
||||
@ -192,7 +193,7 @@ pub(crate) fn remove_task_datetime(
|
||||
if existing.is_empty() {
|
||||
database.delete(wtxn, ×tamp)?;
|
||||
} else {
|
||||
database.put(wtxn, ×tamp, &RoaringBitmap::from_iter(existing))?;
|
||||
database.put(wtxn, ×tamp, &RoaringTreemap::from_iter(existing))?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -201,8 +202,8 @@ pub(crate) fn remove_task_datetime(
|
||||
|
||||
pub(crate) fn keep_tasks_within_datetimes(
|
||||
rtxn: &RoTxn,
|
||||
tasks: &mut RoaringBitmap,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
|
||||
tasks: &mut RoaringTreemap,
|
||||
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
|
||||
after: Option<OffsetDateTime>,
|
||||
before: Option<OffsetDateTime>,
|
||||
) -> Result<()> {
|
||||
@ -212,7 +213,7 @@ pub(crate) fn keep_tasks_within_datetimes(
|
||||
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
|
||||
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
|
||||
};
|
||||
let mut collected_task_ids = RoaringBitmap::new();
|
||||
let mut collected_task_ids = RoaringTreemap::new();
|
||||
let start = map_bound(start, |b| BEI128::new(b.unix_timestamp_nanos()));
|
||||
let end = map_bound(end, |b| BEI128::new(b.unix_timestamp_nanos()));
|
||||
let iter = database.range(rtxn, &(start, end))?;
|
||||
|
@ -167,7 +167,9 @@ macro_rules! snapshot {
|
||||
let (settings, snap_name, _) = $crate::default_snapshot_settings_for_test(test_name, Some(&snap_name));
|
||||
settings.bind(|| {
|
||||
let snap = format!("{}", $value);
|
||||
meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap);
|
||||
insta::allow_duplicates! {
|
||||
meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap);
|
||||
}
|
||||
});
|
||||
};
|
||||
($value:expr, @$inline:literal) => {
|
||||
@ -176,7 +178,9 @@ macro_rules! snapshot {
|
||||
let (settings, _, _) = $crate::default_snapshot_settings_for_test("", Some("_dummy_argument"));
|
||||
settings.bind(|| {
|
||||
let snap = format!("{}", $value);
|
||||
meili_snap::insta::assert_snapshot!(snap, @$inline);
|
||||
insta::allow_duplicates! {
|
||||
meili_snap::insta::assert_snapshot!(snap, @$inline);
|
||||
}
|
||||
});
|
||||
};
|
||||
($value:expr) => {
|
||||
@ -194,7 +198,9 @@ macro_rules! snapshot {
|
||||
let (settings, snap_name, _) = $crate::default_snapshot_settings_for_test(test_name, None);
|
||||
settings.bind(|| {
|
||||
let snap = format!("{}", $value);
|
||||
meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap);
|
||||
insta::allow_duplicates! {
|
||||
meili_snap::insta::assert_snapshot!(format!("{}", snap_name), snap);
|
||||
}
|
||||
});
|
||||
};
|
||||
}
|
||||
|
@ -15,13 +15,13 @@ actix-web = { version = "4.3.1", default-features = false }
|
||||
anyhow = "1.0.70"
|
||||
convert_case = "0.6.0"
|
||||
csv = "1.2.1"
|
||||
deserr = "0.5.0"
|
||||
deserr = { version = "0.6.0", features = ["actix-web"]}
|
||||
either = { version = "1.8.1", features = ["serde"] }
|
||||
enum-iterator = "1.4.0"
|
||||
file-store = { path = "../file-store" }
|
||||
flate2 = "1.0.25"
|
||||
fst = "0.4.7"
|
||||
memmap2 = "0.5.10"
|
||||
memmap2 = "0.7.1"
|
||||
milli = { path = "../milli" }
|
||||
roaring = { version = "0.10.1", features = ["serde"] }
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
|
@ -104,6 +104,7 @@ macro_rules! impl_from_query_param_wrap_original_value_in_error {
|
||||
}
|
||||
impl_from_query_param_wrap_original_value_in_error!(usize, DeserrParseIntError);
|
||||
impl_from_query_param_wrap_original_value_in_error!(u32, DeserrParseIntError);
|
||||
impl_from_query_param_wrap_original_value_in_error!(u64, DeserrParseIntError);
|
||||
impl_from_query_param_wrap_original_value_in_error!(bool, DeserrParseBoolError);
|
||||
|
||||
impl FromQueryParameter for String {
|
||||
|
@ -1,4 +1,3 @@
|
||||
use std::borrow::Borrow;
|
||||
use std::fmt::{self, Debug, Display};
|
||||
use std::fs::File;
|
||||
use std::io::{self, Seek, Write};
|
||||
@ -42,7 +41,7 @@ impl Display for DocumentFormatError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Io(e) => write!(f, "{e}"),
|
||||
Self::MalformedPayload(me, b) => match me.borrow() {
|
||||
Self::MalformedPayload(me, b) => match me {
|
||||
Error::Json(se) => {
|
||||
let mut message = match se.classify() {
|
||||
Category::Data => {
|
||||
|
@ -5,7 +5,7 @@ use std::str::FromStr;
|
||||
|
||||
use enum_iterator::Sequence;
|
||||
use milli::update::IndexDocumentsMethod;
|
||||
use roaring::RoaringBitmap;
|
||||
use roaring::RoaringTreemap;
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use time::{Duration, OffsetDateTime};
|
||||
use uuid::Uuid;
|
||||
@ -15,7 +15,7 @@ use crate::keys::Key;
|
||||
use crate::settings::{Settings, Unchecked};
|
||||
use crate::InstanceUid;
|
||||
|
||||
pub type TaskId = u32;
|
||||
pub type TaskId = u64;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@ -127,11 +127,11 @@ pub enum KindWithContent {
|
||||
},
|
||||
TaskCancelation {
|
||||
query: String,
|
||||
tasks: RoaringBitmap,
|
||||
tasks: RoaringTreemap,
|
||||
},
|
||||
TaskDeletion {
|
||||
query: String,
|
||||
tasks: RoaringBitmap,
|
||||
tasks: RoaringTreemap,
|
||||
},
|
||||
DumpCreation {
|
||||
keys: Vec<Key>,
|
||||
|
@ -39,7 +39,7 @@ byte-unit = { version = "4.0.19", default-features = false, features = [
|
||||
bytes = "1.4.0"
|
||||
clap = { version = "4.2.1", features = ["derive", "env"] }
|
||||
crossbeam-channel = "0.5.8"
|
||||
deserr = "0.5.0"
|
||||
deserr = { version = "0.6.0", features = ["actix-web"]}
|
||||
dump = { path = "../dump" }
|
||||
either = "1.8.1"
|
||||
env_logger = "0.10.0"
|
||||
@ -50,9 +50,9 @@ futures = "0.3.28"
|
||||
futures-util = "0.3.28"
|
||||
http = "0.2.9"
|
||||
index-scheduler = { path = "../index-scheduler" }
|
||||
indexmap = { version = "1.9.3", features = ["serde-1"] }
|
||||
indexmap = { version = "2.0.0", features = ["serde"] }
|
||||
is-terminal = "0.4.8"
|
||||
itertools = "0.10.5"
|
||||
itertools = "0.11.0"
|
||||
jsonwebtoken = "8.3.0"
|
||||
lazy_static = "1.4.0"
|
||||
log = "0.4.17"
|
||||
@ -87,7 +87,7 @@ sha2 = "0.10.6"
|
||||
siphasher = "0.3.10"
|
||||
slice-group-by = "0.3.0"
|
||||
static-files = { version = "0.2.3", optional = true }
|
||||
sysinfo = "0.28.4"
|
||||
sysinfo = "0.29.7"
|
||||
tar = "0.4.38"
|
||||
tempfile = "3.5.0"
|
||||
thiserror = "1.0.40"
|
||||
|
@ -20,7 +20,7 @@ pub struct SearchAggregator;
|
||||
#[allow(dead_code)]
|
||||
impl SearchAggregator {
|
||||
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self::default()
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self, _: &dyn Any) {}
|
||||
@ -32,7 +32,7 @@ pub struct MultiSearchAggregator;
|
||||
#[allow(dead_code)]
|
||||
impl MultiSearchAggregator {
|
||||
pub fn from_queries(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self::default()
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self) {}
|
||||
@ -44,7 +44,7 @@ pub struct FacetSearchAggregator;
|
||||
#[allow(dead_code)]
|
||||
impl FacetSearchAggregator {
|
||||
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self::default()
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self, _: &dyn Any) {}
|
||||
|
@ -203,7 +203,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
||||
.name(String::from("register-snapshot-tasks"))
|
||||
.spawn(move || loop {
|
||||
thread::sleep(snapshot_delay);
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) {
|
||||
error!("Error while registering snapshot: {}", e);
|
||||
}
|
||||
})
|
||||
|
@ -11,7 +11,7 @@ use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
use crate::routes::SummarizedTaskView;
|
||||
use crate::routes::{get_task_id, SummarizedTaskView};
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
|
||||
@ -29,8 +29,9 @@ pub async fn create_dump(
|
||||
keys: auth_controller.list_keys()?,
|
||||
instance_uid: analytics.instance_uid().cloned(),
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -7,7 +7,7 @@ use bstr::ByteSlice;
|
||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||
use deserr::Deserr;
|
||||
use futures::StreamExt;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use index_scheduler::{IndexScheduler, TaskId};
|
||||
use log::debug;
|
||||
use meilisearch_types::deserr::query_params::Param;
|
||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||
@ -36,7 +36,7 @@ use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::extractors::payload::Payload;
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||
use crate::search::parse_filter;
|
||||
|
||||
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||
@ -129,8 +129,9 @@ pub async fn delete_document(
|
||||
index_uid: index_uid.to_string(),
|
||||
documents_ids: vec![document_id],
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
@ -277,6 +278,7 @@ pub async fn replace_documents(
|
||||
analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||
|
||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||
let uid = get_task_id(&req)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
@ -285,6 +287,7 @@ pub async fn replace_documents(
|
||||
params.csv_delimiter,
|
||||
body,
|
||||
IndexDocumentsMethod::ReplaceDocuments,
|
||||
uid,
|
||||
allow_index_creation,
|
||||
)
|
||||
.await?;
|
||||
@ -308,6 +311,7 @@ pub async fn update_documents(
|
||||
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||
|
||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||
let uid = get_task_id(&req)?;
|
||||
let task = document_addition(
|
||||
extract_mime_type(&req)?,
|
||||
index_scheduler,
|
||||
@ -316,6 +320,7 @@ pub async fn update_documents(
|
||||
params.csv_delimiter,
|
||||
body,
|
||||
IndexDocumentsMethod::UpdateDocuments,
|
||||
uid,
|
||||
allow_index_creation,
|
||||
)
|
||||
.await?;
|
||||
@ -332,6 +337,7 @@ async fn document_addition(
|
||||
csv_delimiter: Option<u8>,
|
||||
mut body: Payload,
|
||||
method: IndexDocumentsMethod,
|
||||
task_id: Option<TaskId>,
|
||||
allow_index_creation: bool,
|
||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||
let format = match (
|
||||
@ -445,7 +451,7 @@ async fn document_addition(
|
||||
};
|
||||
|
||||
let scheduler = index_scheduler.clone();
|
||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
|
||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? {
|
||||
Ok(task) => task,
|
||||
Err(e) => {
|
||||
index_scheduler.delete_update_file(uuid)?;
|
||||
@ -476,8 +482,9 @@ pub async fn delete_documents_batch(
|
||||
|
||||
let task =
|
||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -512,8 +519,9 @@ pub async fn delete_documents_by_filter(
|
||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -529,8 +537,9 @@ pub async fn clear_all_documents(
|
||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||
|
||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -17,7 +17,7 @@ use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||
use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||
@ -135,8 +135,9 @@ pub async fn create_index(
|
||||
);
|
||||
|
||||
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
} else {
|
||||
@ -203,8 +204,9 @@ pub async fn update_index(
|
||||
primary_key: body.primary_key,
|
||||
};
|
||||
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -213,11 +215,13 @@ pub async fn update_index(
|
||||
pub async fn delete_index(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
req: HttpRequest,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ use serde_json::json;
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::routes::SummarizedTaskView;
|
||||
use crate::routes::{get_task_id, SummarizedTaskView};
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! make_setting_route {
|
||||
@ -33,7 +33,7 @@ macro_rules! make_setting_route {
|
||||
use $crate::extractors::authentication::policies::*;
|
||||
use $crate::extractors::authentication::GuardedData;
|
||||
use $crate::extractors::sequential_extractor::SeqHandler;
|
||||
use $crate::routes::SummarizedTaskView;
|
||||
use $crate::routes::{get_task_id, SummarizedTaskView};
|
||||
|
||||
pub async fn delete(
|
||||
index_scheduler: GuardedData<
|
||||
@ -41,6 +41,7 @@ macro_rules! make_setting_route {
|
||||
Data<IndexScheduler>,
|
||||
>,
|
||||
index_uid: web::Path<String>,
|
||||
req: HttpRequest,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
|
||||
@ -55,8 +56,9 @@ macro_rules! make_setting_route {
|
||||
is_deletion: true,
|
||||
allow_index_creation,
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
|
||||
.await??
|
||||
.into();
|
||||
|
||||
@ -97,8 +99,9 @@ macro_rules! make_setting_route {
|
||||
is_deletion: false,
|
||||
allow_index_creation,
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
|
||||
.await??
|
||||
.into();
|
||||
|
||||
@ -664,8 +667,9 @@ pub async fn update_all(
|
||||
is_deletion: false,
|
||||
allow_index_creation,
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
@ -687,6 +691,7 @@ pub async fn get_all(
|
||||
pub async fn delete_all(
|
||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
|
||||
index_uid: web::Path<String>,
|
||||
req: HttpRequest,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||
|
||||
@ -700,8 +705,9 @@ pub async fn delete_all(
|
||||
is_deletion: true,
|
||||
allow_index_creation,
|
||||
};
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
|
||||
debug!("returns: {:?}", task);
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
|
@ -5,7 +5,7 @@ use actix_web::{web, HttpRequest, HttpResponse};
|
||||
use index_scheduler::IndexScheduler;
|
||||
use log::debug;
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::error::{Code, ResponseError};
|
||||
use meilisearch_types::settings::{Settings, Unchecked};
|
||||
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -41,6 +41,34 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
.service(web::scope("/experimental-features").configure(features::configure));
|
||||
}
|
||||
|
||||
pub fn get_task_id(req: &HttpRequest) -> Result<Option<TaskId>, ResponseError> {
|
||||
let task_id = req
|
||||
.headers()
|
||||
.get("TaskId")
|
||||
.map(|header| {
|
||||
header.to_str().map_err(|e| {
|
||||
ResponseError::from_msg(
|
||||
format!("TaskId is not a valid utf-8 string: {e}"),
|
||||
Code::BadRequest,
|
||||
)
|
||||
})
|
||||
})
|
||||
.transpose()?
|
||||
.map(|s| {
|
||||
s.parse::<TaskId>().map_err(|e| {
|
||||
ResponseError::from_msg(
|
||||
format!(
|
||||
"Could not parse the TaskId as a {}: {e}",
|
||||
std::any::type_name::<TaskId>(),
|
||||
),
|
||||
Code::BadRequest,
|
||||
)
|
||||
})
|
||||
})
|
||||
.transpose()?;
|
||||
Ok(task_id)
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SummarizedTaskView {
|
||||
|
@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid;
|
||||
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
||||
use serde_json::json;
|
||||
|
||||
use super::SummarizedTaskView;
|
||||
use super::{get_task_id, SummarizedTaskView};
|
||||
use crate::analytics::Analytics;
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
@ -61,7 +61,8 @@ pub async fn swap_indexes(
|
||||
|
||||
let task = KindWithContent::IndexSwap { swaps };
|
||||
|
||||
let task = index_scheduler.register(task)?;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
let uid = get_task_id(&req)?;
|
||||
let task: SummarizedTaskView =
|
||||
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
|
||||
Ok(HttpResponse::Accepted().json(task))
|
||||
}
|
||||
|
@ -20,13 +20,13 @@ use time::macros::format_description;
|
||||
use time::{Date, Duration, OffsetDateTime, Time};
|
||||
use tokio::task;
|
||||
|
||||
use super::SummarizedTaskView;
|
||||
use super::{get_task_id, SummarizedTaskView};
|
||||
use crate::analytics::Analytics;
|
||||
use crate::extractors::authentication::policies::*;
|
||||
use crate::extractors::authentication::GuardedData;
|
||||
use crate::extractors::sequential_extractor::SeqHandler;
|
||||
|
||||
const DEFAULT_LIMIT: u32 = 20;
|
||||
const DEFAULT_LIMIT: u64 = 20;
|
||||
|
||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||
cfg.service(
|
||||
@ -175,14 +175,14 @@ impl From<Details> for DetailsView {
|
||||
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
||||
pub struct TasksFilterQuery {
|
||||
#[deserr(default = Param(DEFAULT_LIMIT), error = DeserrQueryParamError<InvalidTaskLimit>)]
|
||||
pub limit: Param<u32>,
|
||||
pub limit: Param<TaskId>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
|
||||
pub from: Option<Param<TaskId>>,
|
||||
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
|
||||
pub uids: OptionStarOrList<u32>,
|
||||
pub uids: OptionStarOrList<TaskId>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskCanceledBy>)]
|
||||
pub canceled_by: OptionStarOrList<u32>,
|
||||
pub canceled_by: OptionStarOrList<TaskId>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskTypes>)]
|
||||
pub types: OptionStarOrList<Kind>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskStatuses>)]
|
||||
@ -249,9 +249,9 @@ impl TaskDeletionOrCancelationQuery {
|
||||
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
||||
pub struct TaskDeletionOrCancelationQuery {
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
|
||||
pub uids: OptionStarOrList<u32>,
|
||||
pub uids: OptionStarOrList<TaskId>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskCanceledBy>)]
|
||||
pub canceled_by: OptionStarOrList<u32>,
|
||||
pub canceled_by: OptionStarOrList<TaskId>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskTypes>)]
|
||||
pub types: OptionStarOrList<Kind>,
|
||||
#[deserr(default, error = DeserrQueryParamError<InvalidTaskStatuses>)]
|
||||
@ -333,7 +333,9 @@ async fn cancel_tasks(
|
||||
let task_cancelation =
|
||||
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
||||
|
||||
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
|
||||
let uid = get_task_id(&req)?;
|
||||
let task =
|
||||
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
@ -378,7 +380,8 @@ async fn delete_tasks(
|
||||
let task_deletion =
|
||||
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
||||
|
||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
|
||||
let uid = get_task_id(&req)?;
|
||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??;
|
||||
let task: SummarizedTaskView = task.into();
|
||||
|
||||
Ok(HttpResponse::Ok().json(task))
|
||||
@ -388,9 +391,9 @@ async fn delete_tasks(
|
||||
pub struct AllTasks {
|
||||
results: Vec<TaskView>,
|
||||
total: u64,
|
||||
limit: u32,
|
||||
from: Option<u32>,
|
||||
next: Option<u32>,
|
||||
limit: TaskId,
|
||||
from: Option<TaskId>,
|
||||
next: Option<TaskId>,
|
||||
}
|
||||
|
||||
async fn get_tasks(
|
||||
|
@ -680,6 +680,7 @@ fn compute_semantic_score(query: &[f32], vectors: Value) -> milli::Result<Option
|
||||
.map_err(InternalError::SerdeJson)?;
|
||||
Ok(vectors
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.map(|v| OrderedFloat(dot_product_similarity(query, &v)))
|
||||
.max()
|
||||
.map(OrderedFloat::into_inner))
|
||||
|
@ -199,3 +199,74 @@ async fn error_create_with_invalid_index_uid() {
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn send_task_id() {
|
||||
let server = Server::new().await;
|
||||
let app = server.init_web_app().await;
|
||||
let index = server.index("catto");
|
||||
let (response, code) = index.create(None).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||
{
|
||||
"taskUid": 0,
|
||||
"indexUid": "catto",
|
||||
"status": "enqueued",
|
||||
"type": "indexCreation",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
let body = serde_json::to_string(&json!({
|
||||
"uid": "doggo",
|
||||
"primaryKey": None::<&str>,
|
||||
}))
|
||||
.unwrap();
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/indexes")
|
||||
.insert_header(("TaskId", "25"))
|
||||
.insert_header(ContentType::json())
|
||||
.set_payload(body)
|
||||
.to_request();
|
||||
|
||||
let res = test::call_service(&app, req).await;
|
||||
snapshot!(res.status(), @"202 Accepted");
|
||||
|
||||
let bytes = test::read_body(res).await;
|
||||
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||
{
|
||||
"taskUid": 25,
|
||||
"indexUid": "doggo",
|
||||
"status": "enqueued",
|
||||
"type": "indexCreation",
|
||||
"enqueuedAt": "[date]"
|
||||
}
|
||||
"###);
|
||||
|
||||
let body = serde_json::to_string(&json!({
|
||||
"uid": "girafo",
|
||||
"primaryKey": None::<&str>,
|
||||
}))
|
||||
.unwrap();
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/indexes")
|
||||
.insert_header(("TaskId", "12"))
|
||||
.insert_header(ContentType::json())
|
||||
.set_payload(body)
|
||||
.to_request();
|
||||
|
||||
let res = test::call_service(&app, req).await;
|
||||
snapshot!(res.status(), @"400 Bad Request");
|
||||
|
||||
let bytes = test::read_body(res).await;
|
||||
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||
snapshot!(json_string!(response), @r###"
|
||||
{
|
||||
"message": "Received bad task id: 12 should be >= to 26.",
|
||||
"code": "bad_request",
|
||||
"type": "invalid_request",
|
||||
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||
}
|
||||
"###);
|
||||
}
|
||||
|
@ -1,3 +1,4 @@
|
||||
use meili_snap::{json_string, snapshot};
|
||||
use once_cell::sync::Lazy;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
@ -60,3 +61,59 @@ async fn geo_sort_with_geo_strings() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn geo_bounding_box_with_string_and_number() {
|
||||
let server = Server::new().await;
|
||||
let index = server.index("test");
|
||||
|
||||
let documents = DOCUMENTS.clone();
|
||||
index.update_settings_filterable_attributes(json!(["_geo"])).await;
|
||||
index.update_settings_sortable_attributes(json!(["_geo"])).await;
|
||||
index.add_documents(documents, None).await;
|
||||
index.wait_task(2).await;
|
||||
|
||||
index
|
||||
.search(
|
||||
json!({
|
||||
"filter": "_geoBoundingBox([89, 179], [-89, -179])",
|
||||
}),
|
||||
|response, code| {
|
||||
assert_eq!(code, 200, "{}", response);
|
||||
snapshot!(json_string!(response, { ".processingTimeMs" => "[time]" }), @r###"
|
||||
{
|
||||
"hits": [
|
||||
{
|
||||
"id": 1,
|
||||
"name": "Taco Truck",
|
||||
"address": "444 Salsa Street, Burritoville",
|
||||
"type": "Mexican",
|
||||
"rating": 9,
|
||||
"_geo": {
|
||||
"lat": 34.0522,
|
||||
"lng": -118.2437
|
||||
}
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"name": "La Bella Italia",
|
||||
"address": "456 Elm Street, Townsville",
|
||||
"type": "Italian",
|
||||
"rating": 9,
|
||||
"_geo": {
|
||||
"lat": "45.4777599",
|
||||
"lng": "9.1967508"
|
||||
}
|
||||
}
|
||||
],
|
||||
"query": "",
|
||||
"processingTimeMs": "[time]",
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"estimatedTotalHits": 2
|
||||
}
|
||||
"###);
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
@ -17,10 +17,10 @@ bincode = "1.3.3"
|
||||
bstr = "1.4.0"
|
||||
bytemuck = { version = "1.13.1", features = ["extern_crate_alloc"] }
|
||||
byteorder = "1.4.3"
|
||||
charabia = { version = "0.8.2", default-features = false }
|
||||
charabia = { version = "0.8.3", default-features = false }
|
||||
concat-arrays = "0.1.2"
|
||||
crossbeam-channel = "0.5.8"
|
||||
deserr = "0.5.0"
|
||||
deserr = { version = "0.6.0", features = ["actix-web"]}
|
||||
either = { version = "1.8.1", features = ["serde"] }
|
||||
flatten-serde-json = { path = "../flatten-serde-json" }
|
||||
fst = "0.4.7"
|
||||
@ -32,18 +32,18 @@ grenad = { version = "0.4.4", default-features = false, features = [
|
||||
heed = { git = "https://github.com/meilisearch/heed", tag = "v0.12.7", default-features = false, features = [
|
||||
"lmdb", "read-txn-no-tls"
|
||||
] }
|
||||
indexmap = { version = "1.9.3", features = ["serde"] }
|
||||
indexmap = { version = "2.0.0", features = ["serde"] }
|
||||
instant-distance = { version = "0.6.1", features = ["with-serde"] }
|
||||
json-depth-checker = { path = "../json-depth-checker" }
|
||||
levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] }
|
||||
memmap2 = "0.5.10"
|
||||
memmap2 = "0.7.1"
|
||||
obkv = "0.2.0"
|
||||
once_cell = "1.17.1"
|
||||
ordered-float = "3.6.0"
|
||||
rand_pcg = { version = "0.3.1", features = ["serde1"] }
|
||||
rayon = "1.7.0"
|
||||
roaring = "0.10.1"
|
||||
rstar = { version = "0.10.0", features = ["serde"] }
|
||||
rstar = { version = "0.11.0", features = ["serde"] }
|
||||
serde = { version = "1.0.160", features = ["derive"] }
|
||||
serde_json = { version = "1.0.95", features = ["preserve_order"] }
|
||||
slice-group-by = "0.3.0"
|
||||
@ -63,7 +63,7 @@ uuid = { version = "1.3.1", features = ["v4"] }
|
||||
filter-parser = { path = "../filter-parser" }
|
||||
|
||||
# documents words self-join
|
||||
itertools = "0.10.5"
|
||||
itertools = "0.11.0"
|
||||
|
||||
# profiling
|
||||
puffin = "0.16.0"
|
||||
|
@ -122,22 +122,28 @@ only composed of alphanumeric characters (a-z A-Z 0-9), hyphens (-) and undersco
|
||||
.field,
|
||||
match .valid_fields.is_empty() {
|
||||
true => "This index does not have configured sortable attributes.".to_string(),
|
||||
false => format!("Available sortable attributes are: `{}`.",
|
||||
valid_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", ")
|
||||
false => format!("Available sortable attributes are: `{}{}`.",
|
||||
valid_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", "),
|
||||
.hidden_fields.then_some(", <..hidden-attributes>").unwrap_or(""),
|
||||
),
|
||||
}
|
||||
)]
|
||||
InvalidSortableAttribute { field: String, valid_fields: BTreeSet<String> },
|
||||
InvalidSortableAttribute { field: String, valid_fields: BTreeSet<String>, hidden_fields: bool },
|
||||
#[error("Attribute `{}` is not facet-searchable. {}",
|
||||
.field,
|
||||
match .valid_fields.is_empty() {
|
||||
true => "This index does not have configured facet-searchable attributes. To make it facet-searchable add it to the `filterableAttributes` index settings.".to_string(),
|
||||
false => format!("Available facet-searchable attributes are: `{}`. To make it facet-searchable add it to the `filterableAttributes` index settings.",
|
||||
valid_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", ")
|
||||
false => format!("Available facet-searchable attributes are: `{}{}`. To make it facet-searchable add it to the `filterableAttributes` index settings.",
|
||||
valid_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", "),
|
||||
.hidden_fields.then_some(", <..hidden-attributes>").unwrap_or(""),
|
||||
),
|
||||
}
|
||||
)]
|
||||
InvalidFacetSearchFacetName { field: String, valid_fields: BTreeSet<String> },
|
||||
InvalidFacetSearchFacetName {
|
||||
field: String,
|
||||
valid_fields: BTreeSet<String>,
|
||||
hidden_fields: bool,
|
||||
},
|
||||
#[error("Attribute `{}` is not searchable. Available searchable attributes are: `{}{}`.",
|
||||
.field,
|
||||
.valid_fields.iter().map(AsRef::as_ref).collect::<Vec<&str>>().join(", "),
|
||||
@ -340,8 +346,11 @@ fn conditionally_lookup_for_error_message() {
|
||||
];
|
||||
|
||||
for (list, suffix) in messages {
|
||||
let err =
|
||||
UserError::InvalidSortableAttribute { field: "name".to_string(), valid_fields: list };
|
||||
let err = UserError::InvalidSortableAttribute {
|
||||
field: "name".to_string(),
|
||||
valid_fields: list,
|
||||
hidden_fields: false,
|
||||
};
|
||||
|
||||
assert_eq!(err.to_string(), format!("{} {}", prefix, suffix));
|
||||
}
|
||||
|
@ -20,7 +20,10 @@ pub use self::beu32_str_codec::BEU32StrCodec;
|
||||
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
|
||||
pub use self::fst_set_codec::FstSetCodec;
|
||||
pub use self::obkv_codec::ObkvCodec;
|
||||
pub use self::roaring_bitmap::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, RoaringBitmapCodec};
|
||||
pub use self::roaring_bitmap::{
|
||||
BoRoaringBitmapCodec, CboRoaringBitmapCodec, CboRoaringTreemapCodec, RoaringBitmapCodec,
|
||||
RoaringTreemapCodec,
|
||||
};
|
||||
pub use self::roaring_bitmap_length::{
|
||||
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
|
||||
};
|
||||
|
196
milli/src/heed_codec/roaring_bitmap/cbo_roaring_treemap_codec.rs
Normal file
196
milli/src/heed_codec/roaring_bitmap/cbo_roaring_treemap_codec.rs
Normal file
@ -0,0 +1,196 @@
|
||||
use std::borrow::Cow;
|
||||
use std::io;
|
||||
use std::mem::size_of;
|
||||
|
||||
use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
|
||||
use roaring::RoaringTreemap;
|
||||
|
||||
use crate::heed_codec::BytesDecodeOwned;
|
||||
|
||||
/// This is the limit where using a byteorder became less size efficient
|
||||
/// than using a direct roaring encoding, it is also the point where we are able
|
||||
/// to determine the encoding used only by using the array of bytes length.
|
||||
pub const THRESHOLD: usize = 4;
|
||||
|
||||
/// A conditionnal codec that either use the RoaringBitmap
|
||||
/// or a lighter ByteOrder en/decoding method.
|
||||
pub struct CboRoaringTreemapCodec;
|
||||
|
||||
impl CboRoaringTreemapCodec {
|
||||
pub fn serialized_size(roaring: &RoaringTreemap) -> usize {
|
||||
if roaring.len() <= THRESHOLD as u64 {
|
||||
roaring.len() as usize * size_of::<u64>()
|
||||
} else {
|
||||
roaring.serialized_size()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize_into(roaring: &RoaringTreemap, vec: &mut Vec<u8>) {
|
||||
if roaring.len() <= THRESHOLD as u64 {
|
||||
// If the number of items (u32s) to encode is less than or equal to the threshold
|
||||
// it means that it would weigh the same or less than the RoaringBitmap
|
||||
// header, so we directly encode them using ByteOrder instead.
|
||||
for integer in roaring {
|
||||
vec.write_u64::<NativeEndian>(integer).unwrap();
|
||||
}
|
||||
} else {
|
||||
// Otherwise, we use the classic RoaringBitmapCodec that writes a header.
|
||||
roaring.serialize_into(vec).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringTreemap> {
|
||||
if bytes.len() <= THRESHOLD * size_of::<u64>() {
|
||||
// If there is threshold or less than threshold integers that can fit into this array
|
||||
// of bytes it means that we used the ByteOrder codec serializer.
|
||||
let mut bitmap = RoaringTreemap::new();
|
||||
while let Ok(integer) = bytes.read_u64::<NativeEndian>() {
|
||||
bitmap.insert(integer);
|
||||
}
|
||||
Ok(bitmap)
|
||||
} else {
|
||||
// Otherwise, it means we used the classic RoaringBitmapCodec and
|
||||
// that the header takes threshold integers.
|
||||
RoaringTreemap::deserialize_unchecked_from(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
/// Merge serialized CboRoaringBitmaps in a buffer.
|
||||
///
|
||||
/// if the merged values length is under the threshold, values are directly
|
||||
/// serialized in the buffer else a RoaringBitmap is created from the
|
||||
/// values and is serialized in the buffer.
|
||||
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
|
||||
let mut roaring = RoaringTreemap::new();
|
||||
let mut vec = Vec::new();
|
||||
|
||||
for bytes in slices {
|
||||
if bytes.len() <= THRESHOLD * size_of::<u64>() {
|
||||
let mut reader = bytes.as_ref();
|
||||
while let Ok(integer) = reader.read_u64::<NativeEndian>() {
|
||||
vec.push(integer);
|
||||
}
|
||||
} else {
|
||||
roaring |= RoaringTreemap::deserialize_unchecked_from(bytes.as_ref())?;
|
||||
}
|
||||
}
|
||||
|
||||
if roaring.is_empty() {
|
||||
vec.sort_unstable();
|
||||
vec.dedup();
|
||||
|
||||
if vec.len() <= THRESHOLD {
|
||||
for integer in vec {
|
||||
buffer.extend_from_slice(&integer.to_ne_bytes());
|
||||
}
|
||||
} else {
|
||||
// We can unwrap safely because the vector is sorted upper.
|
||||
let roaring = RoaringTreemap::from_sorted_iter(vec.into_iter()).unwrap();
|
||||
roaring.serialize_into(buffer)?;
|
||||
}
|
||||
} else {
|
||||
roaring.extend(vec);
|
||||
roaring.serialize_into(buffer)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl heed::BytesDecode<'_> for CboRoaringTreemapCodec {
|
||||
type DItem = RoaringTreemap;
|
||||
|
||||
fn bytes_decode(bytes: &[u8]) -> Option<Self::DItem> {
|
||||
Self::deserialize_from(bytes).ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl BytesDecodeOwned for CboRoaringTreemapCodec {
|
||||
type DItem = RoaringTreemap;
|
||||
|
||||
fn bytes_decode_owned(bytes: &[u8]) -> Option<Self::DItem> {
|
||||
Self::deserialize_from(bytes).ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl heed::BytesEncode<'_> for CboRoaringTreemapCodec {
|
||||
type EItem = RoaringTreemap;
|
||||
|
||||
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
|
||||
let mut vec = Vec::with_capacity(Self::serialized_size(item));
|
||||
Self::serialize_into(item, &mut vec);
|
||||
Some(Cow::Owned(vec))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::iter::FromIterator;
|
||||
|
||||
use heed::{BytesDecode, BytesEncode};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn verify_encoding_decoding() {
|
||||
let input = RoaringTreemap::from_iter(0..THRESHOLD as u64);
|
||||
let bytes = CboRoaringTreemapCodec::bytes_encode(&input).unwrap();
|
||||
let output = CboRoaringTreemapCodec::bytes_decode(&bytes).unwrap();
|
||||
assert_eq!(input, output);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_threshold() {
|
||||
let input = RoaringTreemap::from_iter(0..THRESHOLD as u64);
|
||||
|
||||
// use roaring treemap
|
||||
let mut bytes = Vec::new();
|
||||
input.serialize_into(&mut bytes).unwrap();
|
||||
let roaring_size = bytes.len();
|
||||
|
||||
// use byteorder directly
|
||||
let mut bytes = Vec::new();
|
||||
for integer in input {
|
||||
bytes.write_u64::<NativeEndian>(integer).unwrap();
|
||||
}
|
||||
let bo_size = bytes.len();
|
||||
|
||||
assert!(roaring_size > bo_size, "roaring size: {}, bo size {}", roaring_size, bo_size);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn merge_cbo_roaring_bitmaps() {
|
||||
let mut buffer = Vec::new();
|
||||
|
||||
let small_data = vec![
|
||||
RoaringTreemap::from_sorted_iter(1..4).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(2..5).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(4..6).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(1..3).unwrap(),
|
||||
];
|
||||
|
||||
let small_data: Vec<_> =
|
||||
small_data.iter().map(|b| CboRoaringTreemapCodec::bytes_encode(b).unwrap()).collect();
|
||||
CboRoaringTreemapCodec::merge_into(small_data.as_slice(), &mut buffer).unwrap();
|
||||
let bitmap = CboRoaringTreemapCodec::deserialize_from(&buffer).unwrap();
|
||||
let expected = RoaringTreemap::from_sorted_iter(1..6).unwrap();
|
||||
assert_eq!(bitmap, expected);
|
||||
|
||||
let medium_data = vec![
|
||||
RoaringTreemap::from_sorted_iter(1..4).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(2..5).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(4..8).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(0..3).unwrap(),
|
||||
RoaringTreemap::from_sorted_iter(7..23).unwrap(),
|
||||
];
|
||||
|
||||
let medium_data: Vec<_> =
|
||||
medium_data.iter().map(|b| CboRoaringTreemapCodec::bytes_encode(b).unwrap()).collect();
|
||||
buffer.clear();
|
||||
CboRoaringTreemapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap();
|
||||
|
||||
let bitmap = CboRoaringTreemapCodec::deserialize_from(&buffer).unwrap();
|
||||
let expected = RoaringTreemap::from_sorted_iter(0..23).unwrap();
|
||||
assert_eq!(bitmap, expected);
|
||||
}
|
||||
}
|
@ -1,7 +1,11 @@
|
||||
mod bo_roaring_bitmap_codec;
|
||||
pub mod cbo_roaring_bitmap_codec;
|
||||
pub mod cbo_roaring_treemap_codec;
|
||||
mod roaring_bitmap_codec;
|
||||
mod roaring_treemap_codec;
|
||||
|
||||
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
|
||||
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
|
||||
pub use self::cbo_roaring_treemap_codec::CboRoaringTreemapCodec;
|
||||
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
|
||||
pub use self::roaring_treemap_codec::RoaringTreemapCodec;
|
||||
|
33
milli/src/heed_codec/roaring_bitmap/roaring_treemap_codec.rs
Normal file
33
milli/src/heed_codec/roaring_bitmap/roaring_treemap_codec.rs
Normal file
@ -0,0 +1,33 @@
|
||||
use std::borrow::Cow;
|
||||
|
||||
use roaring::RoaringTreemap;
|
||||
|
||||
use crate::heed_codec::BytesDecodeOwned;
|
||||
|
||||
pub struct RoaringTreemapCodec;
|
||||
|
||||
impl heed::BytesDecode<'_> for RoaringTreemapCodec {
|
||||
type DItem = RoaringTreemap;
|
||||
|
||||
fn bytes_decode(bytes: &[u8]) -> Option<Self::DItem> {
|
||||
RoaringTreemap::deserialize_unchecked_from(bytes).ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl BytesDecodeOwned for RoaringTreemapCodec {
|
||||
type DItem = RoaringTreemap;
|
||||
|
||||
fn bytes_decode_owned(bytes: &[u8]) -> Option<Self::DItem> {
|
||||
RoaringTreemap::deserialize_from(bytes).ok()
|
||||
}
|
||||
}
|
||||
|
||||
impl heed::BytesEncode<'_> for RoaringTreemapCodec {
|
||||
type EItem = RoaringTreemap;
|
||||
|
||||
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
|
||||
let mut bytes = Vec::with_capacity(item.serialized_size());
|
||||
item.serialize_into(&mut bytes).ok()?;
|
||||
Some(Cow::Owned(bytes))
|
||||
}
|
||||
}
|
@ -655,6 +655,26 @@ impl Index {
|
||||
}
|
||||
}
|
||||
|
||||
/* remove hidden fields */
|
||||
pub fn remove_hidden_fields(
|
||||
&self,
|
||||
rtxn: &RoTxn,
|
||||
fields: impl IntoIterator<Item = impl AsRef<str>>,
|
||||
) -> Result<(BTreeSet<String>, bool)> {
|
||||
let mut valid_fields =
|
||||
fields.into_iter().map(|f| f.as_ref().to_string()).collect::<BTreeSet<String>>();
|
||||
|
||||
let fields_len = valid_fields.len();
|
||||
|
||||
if let Some(dn) = self.displayed_fields(rtxn)? {
|
||||
let displayable_names = dn.iter().map(|s| s.to_string()).collect();
|
||||
valid_fields = &valid_fields & &displayable_names;
|
||||
}
|
||||
|
||||
let hidden_fields = fields_len > valid_fields.len();
|
||||
Ok((valid_fields, hidden_fields))
|
||||
}
|
||||
|
||||
/* searchable fields */
|
||||
|
||||
/// Write the user defined searchable fields and generate the real searchable fields from the specified fields ids map.
|
||||
@ -1820,11 +1840,11 @@ pub(crate) mod tests {
|
||||
.unwrap();
|
||||
index
|
||||
.add_documents(documents!([
|
||||
{ "id": 0, "_geo": { "lat": 0, "lng": 0 } },
|
||||
{ "id": 1, "_geo": { "lat": 0, "lng": -175 } },
|
||||
{ "id": 2, "_geo": { "lat": 0, "lng": 175 } },
|
||||
{ "id": 0, "_geo": { "lat": "0", "lng": "0" } },
|
||||
{ "id": 1, "_geo": { "lat": 0, "lng": "-175" } },
|
||||
{ "id": 2, "_geo": { "lat": "0", "lng": 175 } },
|
||||
{ "id": 3, "_geo": { "lat": 85, "lng": 0 } },
|
||||
{ "id": 4, "_geo": { "lat": -85, "lng": 0 } },
|
||||
{ "id": 4, "_geo": { "lat": "-85", "lng": "0" } },
|
||||
]))
|
||||
.unwrap();
|
||||
|
||||
|
@ -97,7 +97,7 @@ const MAX_LMDB_KEY_LENGTH: usize = 500;
|
||||
///
|
||||
/// This number is determined by the keys of the different facet databases
|
||||
/// and adding a margin of safety.
|
||||
pub const MAX_FACET_VALUE_LENGTH: usize = MAX_LMDB_KEY_LENGTH - 20;
|
||||
pub const MAX_FACET_VALUE_LENGTH: usize = MAX_LMDB_KEY_LENGTH - 32;
|
||||
|
||||
/// The maximum length a word can be
|
||||
pub const MAX_WORD_LENGTH: usize = MAX_LMDB_KEY_LENGTH / 2;
|
||||
@ -293,15 +293,15 @@ pub fn normalize_facet(original: &str) -> String {
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||
#[serde(transparent)]
|
||||
pub struct VectorOrArrayOfVectors {
|
||||
#[serde(with = "either::serde_untagged")]
|
||||
inner: either::Either<Vec<f32>, Vec<Vec<f32>>>,
|
||||
#[serde(with = "either::serde_untagged_optional")]
|
||||
inner: Option<either::Either<Vec<f32>, Vec<Vec<f32>>>>,
|
||||
}
|
||||
|
||||
impl VectorOrArrayOfVectors {
|
||||
pub fn into_array_of_vectors(self) -> Vec<Vec<f32>> {
|
||||
match self.inner {
|
||||
either::Either::Left(vector) => vec![vector],
|
||||
either::Either::Right(vectors) => vectors,
|
||||
pub fn into_array_of_vectors(self) -> Option<Vec<Vec<f32>>> {
|
||||
match self.inner? {
|
||||
either::Either::Left(vector) => Some(vec![vector]),
|
||||
either::Either::Right(vectors) => Some(vectors),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -280,9 +280,13 @@ impl<'a> SearchForFacetValues<'a> {
|
||||
|
||||
let filterable_fields = index.filterable_fields(rtxn)?;
|
||||
if !filterable_fields.contains(&self.facet) {
|
||||
let (valid_fields, hidden_fields) =
|
||||
index.remove_hidden_fields(rtxn, filterable_fields)?;
|
||||
|
||||
return Err(UserError::InvalidFacetSearchFacetName {
|
||||
field: self.facet.clone(),
|
||||
valid_fields: filterable_fields.into_iter().collect(),
|
||||
valid_fields,
|
||||
hidden_fields,
|
||||
}
|
||||
.into());
|
||||
}
|
||||
|
@ -91,11 +91,12 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
|
||||
/// Update the universes accordingly and inform the logger.
|
||||
macro_rules! back {
|
||||
() => {
|
||||
assert!(
|
||||
ranking_rule_universes[cur_ranking_rule_index].is_empty(),
|
||||
"The ranking rule {} did not sort its bucket exhaustively",
|
||||
ranking_rules[cur_ranking_rule_index].id()
|
||||
);
|
||||
// FIXME: temporarily disabled assert: see <https://github.com/meilisearch/meilisearch/pull/4013>
|
||||
// assert!(
|
||||
// ranking_rule_universes[cur_ranking_rule_index].is_empty(),
|
||||
// "The ranking rule {} did not sort its bucket exhaustively",
|
||||
// ranking_rules[cur_ranking_rule_index].id()
|
||||
// );
|
||||
logger.end_iteration_ranking_rule(
|
||||
cur_ranking_rule_index,
|
||||
ranking_rules[cur_ranking_rule_index].as_ref(),
|
||||
|
@ -20,7 +20,7 @@ mod sort;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::collections::HashSet;
|
||||
|
||||
use bucket_sort::{bucket_sort, BucketSortOutput};
|
||||
use charabia::TokenizerBuilder;
|
||||
@ -108,24 +108,11 @@ impl<'ctx> SearchContext<'ctx> {
|
||||
(None, None) => continue,
|
||||
// The field is not searchable => User error
|
||||
(_fid, Some(false)) => {
|
||||
let mut valid_fields: BTreeSet<_> =
|
||||
fids_map.names().map(String::from).collect();
|
||||
let (valid_fields, hidden_fields) = match searchable_names {
|
||||
Some(sn) => self.index.remove_hidden_fields(self.txn, sn)?,
|
||||
None => self.index.remove_hidden_fields(self.txn, fids_map.names())?,
|
||||
};
|
||||
|
||||
// Filter by the searchable names
|
||||
if let Some(sn) = searchable_names {
|
||||
let searchable_names = sn.iter().map(|s| s.to_string()).collect();
|
||||
valid_fields = &valid_fields & &searchable_names;
|
||||
}
|
||||
|
||||
let searchable_count = valid_fields.len();
|
||||
|
||||
// Remove hidden fields
|
||||
if let Some(dn) = self.index.displayed_fields(self.txn)? {
|
||||
let displayable_names = dn.iter().map(|s| s.to_string()).collect();
|
||||
valid_fields = &valid_fields & &displayable_names;
|
||||
}
|
||||
|
||||
let hidden_fields = searchable_count > valid_fields.len();
|
||||
let field = field_name.to_string();
|
||||
return Err(UserError::InvalidSearchableAttribute {
|
||||
field,
|
||||
@ -604,16 +591,24 @@ fn check_sort_criteria(ctx: &SearchContext, sort_criteria: Option<&Vec<AscDesc>>
|
||||
for asc_desc in sort_criteria {
|
||||
match asc_desc.member() {
|
||||
Member::Field(ref field) if !crate::is_faceted(field, &sortable_fields) => {
|
||||
let (valid_fields, hidden_fields) =
|
||||
ctx.index.remove_hidden_fields(ctx.txn, sortable_fields)?;
|
||||
|
||||
return Err(UserError::InvalidSortableAttribute {
|
||||
field: field.to_string(),
|
||||
valid_fields: sortable_fields.into_iter().collect(),
|
||||
})?
|
||||
valid_fields,
|
||||
hidden_fields,
|
||||
})?;
|
||||
}
|
||||
Member::Geo(_) if !sortable_fields.contains("_geo") => {
|
||||
let (valid_fields, hidden_fields) =
|
||||
ctx.index.remove_hidden_fields(ctx.txn, sortable_fields)?;
|
||||
|
||||
return Err(UserError::InvalidSortableAttribute {
|
||||
field: "_geo".to_string(),
|
||||
valid_fields: sortable_fields.into_iter().collect(),
|
||||
})?
|
||||
valid_fields,
|
||||
hidden_fields,
|
||||
})?;
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
@ -94,7 +94,7 @@ use crate::heed_codec::facet::{FacetGroupKey, FacetGroupKeyCodec, FacetGroupValu
|
||||
use crate::heed_codec::ByteSliceRefCodec;
|
||||
use crate::update::index_documents::create_sorter;
|
||||
use crate::update::merge_btreeset_string;
|
||||
use crate::{BEU16StrCodec, Index, Result, BEU16};
|
||||
use crate::{BEU16StrCodec, Index, Result, BEU16, MAX_FACET_VALUE_LENGTH};
|
||||
|
||||
pub mod bulk;
|
||||
pub mod delete;
|
||||
@ -191,7 +191,16 @@ impl<'i> FacetsUpdate<'i> {
|
||||
for result in database.iter(wtxn)? {
|
||||
let (facet_group_key, ()) = result?;
|
||||
if let FacetGroupKey { field_id, level: 0, left_bound } = facet_group_key {
|
||||
let normalized_facet = left_bound.normalize(&options);
|
||||
let mut normalized_facet = left_bound.normalize(&options);
|
||||
let normalized_truncated_facet: String;
|
||||
if normalized_facet.len() > MAX_FACET_VALUE_LENGTH {
|
||||
normalized_truncated_facet = normalized_facet
|
||||
.char_indices()
|
||||
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
|
||||
.map(|(_, c)| c)
|
||||
.collect();
|
||||
normalized_facet = normalized_truncated_facet.into();
|
||||
}
|
||||
let set = BTreeSet::from_iter(std::iter::once(left_bound));
|
||||
let key = (field_id, normalized_facet.as_ref());
|
||||
let key = BEU16StrCodec::bytes_encode(&key).ok_or(heed::Error::Encoding)?;
|
||||
|
@ -28,8 +28,8 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
indexer: GrenadParameters,
|
||||
searchable_fields: &Option<HashSet<FieldId>>,
|
||||
stop_words: Option<&fst::Set<&[u8]>>,
|
||||
allowed_separators: Option<&Vec<&str>>,
|
||||
dictionary: Option<&Vec<&str>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<(RoaringBitmap, grenad::Reader<File>, ScriptLanguageDocidsMap)> {
|
||||
puffin::profile_function!();
|
||||
@ -55,12 +55,10 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
|
||||
tokenizer_builder.stop_words(stop_words);
|
||||
}
|
||||
if let Some(dictionary) = dictionary {
|
||||
// let dictionary: Vec<_> = dictionary.iter().map(String::as_str).collect();
|
||||
tokenizer_builder.words_dict(dictionary.as_slice());
|
||||
tokenizer_builder.words_dict(dictionary);
|
||||
}
|
||||
if let Some(separators) = allowed_separators {
|
||||
// let separators: Vec<_> = separators.iter().map(String::as_str).collect();
|
||||
tokenizer_builder.separators(separators.as_slice());
|
||||
tokenizer_builder.separators(separators);
|
||||
}
|
||||
let tokenizer = tokenizer_builder.build();
|
||||
|
||||
|
@ -46,7 +46,7 @@ pub fn extract_facet_string_docids<R: io::Read + io::Seek>(
|
||||
if normalised_value.len() > MAX_FACET_VALUE_LENGTH {
|
||||
normalised_truncated_value = normalised_value
|
||||
.char_indices()
|
||||
.take_while(|(idx, _)| idx + 4 < MAX_FACET_VALUE_LENGTH)
|
||||
.take_while(|(idx, _)| *idx < MAX_FACET_VALUE_LENGTH)
|
||||
.map(|(_, c)| c)
|
||||
.collect();
|
||||
normalised_value = normalised_truncated_value.as_str();
|
||||
|
@ -28,11 +28,13 @@ pub struct ExtractedFacetValues {
|
||||
///
|
||||
/// Returns the generated grenad reader containing the docid the fid and the orginal value as key
|
||||
/// and the normalized value as value extracted from the given chunk of documents.
|
||||
/// We need the fid of the geofields to correctly parse them as numbers if they were sent as strings initially.
|
||||
#[logging_timer::time]
|
||||
pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
obkv_documents: grenad::Reader<R>,
|
||||
indexer: GrenadParameters,
|
||||
faceted_fields: &HashSet<FieldId>,
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
) -> Result<ExtractedFacetValues> {
|
||||
puffin::profile_function!();
|
||||
|
||||
@ -84,7 +86,10 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
||||
|
||||
let value = from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
|
||||
|
||||
match extract_facet_values(&value) {
|
||||
match extract_facet_values(
|
||||
&value,
|
||||
geo_fields_ids.map_or(false, |(lat, lng)| field_id == lat || field_id == lng),
|
||||
) {
|
||||
FilterableValues::Null => {
|
||||
facet_is_null_docids.entry(field_id).or_default().insert(document);
|
||||
}
|
||||
@ -177,12 +182,13 @@ enum FilterableValues {
|
||||
Values { numbers: Vec<f64>, strings: Vec<(String, String)> },
|
||||
}
|
||||
|
||||
fn extract_facet_values(value: &Value) -> FilterableValues {
|
||||
fn extract_facet_values(value: &Value, geo_field: bool) -> FilterableValues {
|
||||
fn inner_extract_facet_values(
|
||||
value: &Value,
|
||||
can_recurse: bool,
|
||||
output_numbers: &mut Vec<f64>,
|
||||
output_strings: &mut Vec<(String, String)>,
|
||||
geo_field: bool,
|
||||
) {
|
||||
match value {
|
||||
Value::Null => (),
|
||||
@ -193,13 +199,30 @@ fn extract_facet_values(value: &Value) -> FilterableValues {
|
||||
}
|
||||
}
|
||||
Value::String(original) => {
|
||||
// if we're working on a geofield it MUST be something we can parse or else there was an internal error
|
||||
// in the enrich pipeline. But since the enrich pipeline worked, we want to avoid crashing at all costs.
|
||||
if geo_field {
|
||||
if let Ok(float) = original.parse() {
|
||||
output_numbers.push(float);
|
||||
} else {
|
||||
log::warn!(
|
||||
"Internal error, could not parse a geofield that has been validated. Please open an issue."
|
||||
)
|
||||
}
|
||||
}
|
||||
let normalized = crate::normalize_facet(original);
|
||||
output_strings.push((normalized, original.clone()));
|
||||
}
|
||||
Value::Array(values) => {
|
||||
if can_recurse {
|
||||
for value in values {
|
||||
inner_extract_facet_values(value, false, output_numbers, output_strings);
|
||||
inner_extract_facet_values(
|
||||
value,
|
||||
false,
|
||||
output_numbers,
|
||||
output_strings,
|
||||
geo_field,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -215,7 +238,7 @@ fn extract_facet_values(value: &Value) -> FilterableValues {
|
||||
otherwise => {
|
||||
let mut numbers = Vec::new();
|
||||
let mut strings = Vec::new();
|
||||
inner_extract_facet_values(otherwise, true, &mut numbers, &mut strings);
|
||||
inner_extract_facet_values(otherwise, true, &mut numbers, &mut strings, geo_field);
|
||||
FilterableValues::Values { numbers, strings }
|
||||
}
|
||||
}
|
||||
|
@ -35,7 +35,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
// lazily get it when needed
|
||||
let document_id = || -> Value {
|
||||
let document_id = obkv.get(primary_key_id).unwrap();
|
||||
serde_json::from_slice(document_id).unwrap()
|
||||
from_slice(document_id).unwrap()
|
||||
};
|
||||
|
||||
// first we retrieve the _vectors field
|
||||
@ -52,12 +52,14 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
||||
}
|
||||
};
|
||||
|
||||
for (i, vector) in vectors.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
let index = u16::try_from(i).unwrap();
|
||||
let mut key = docid_bytes.to_vec();
|
||||
key.extend_from_slice(&index.to_be_bytes());
|
||||
let bytes = cast_slice(&vector);
|
||||
writer.insert(key, bytes)?;
|
||||
if let Some(vectors) = vectors {
|
||||
for (i, vector) in vectors.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
let index = u16::try_from(i).unwrap();
|
||||
let mut key = docid_bytes.to_vec();
|
||||
key.extend_from_slice(&index.to_be_bytes());
|
||||
let bytes = cast_slice(&vector);
|
||||
writer.insert(key, bytes)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
// else => the `_vectors` object was `null`, there is nothing to do
|
||||
|
@ -49,8 +49,8 @@ pub(crate) fn data_from_obkv_documents(
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
vectors_field_id: Option<FieldId>,
|
||||
stop_words: Option<fst::Set<&[u8]>>,
|
||||
allowed_separators: Option<Vec<&str>>,
|
||||
dictionary: Option<Vec<&str>>,
|
||||
allowed_separators: Option<&[&str]>,
|
||||
dictionary: Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
exact_attributes: HashSet<FieldId>,
|
||||
) -> Result<()> {
|
||||
@ -293,8 +293,8 @@ fn send_and_extract_flattened_documents_data(
|
||||
geo_fields_ids: Option<(FieldId, FieldId)>,
|
||||
vectors_field_id: Option<FieldId>,
|
||||
stop_words: &Option<fst::Set<&[u8]>>,
|
||||
allowed_separators: &Option<Vec<&str>>,
|
||||
dictionary: &Option<Vec<&str>>,
|
||||
allowed_separators: &Option<&[&str]>,
|
||||
dictionary: &Option<&[&str]>,
|
||||
max_positions_per_attributes: Option<u32>,
|
||||
) -> Result<(
|
||||
grenad::Reader<CursorClonableMmap>,
|
||||
@ -350,8 +350,8 @@ fn send_and_extract_flattened_documents_data(
|
||||
indexer,
|
||||
searchable_fields,
|
||||
stop_words.as_ref(),
|
||||
allowed_separators.as_ref(),
|
||||
dictionary.as_ref(),
|
||||
*allowed_separators,
|
||||
*dictionary,
|
||||
max_positions_per_attributes,
|
||||
)?;
|
||||
|
||||
@ -378,6 +378,7 @@ fn send_and_extract_flattened_documents_data(
|
||||
flattened_documents_chunk.clone(),
|
||||
indexer,
|
||||
faceted_fields,
|
||||
geo_fields_ids,
|
||||
)?;
|
||||
|
||||
// send docid_fid_facet_numbers_chunk to DB writer
|
||||
|
@ -359,8 +359,8 @@ where
|
||||
geo_fields_ids,
|
||||
vectors_field_id,
|
||||
stop_words,
|
||||
separators,
|
||||
dictionary,
|
||||
separators.as_deref(),
|
||||
dictionary.as_deref(),
|
||||
max_positions_per_attributes,
|
||||
exact_attributes,
|
||||
)
|
||||
|
Reference in New Issue
Block a user