Compare commits

...

2 Commits

Author SHA1 Message Date
6243422ff4 make the task id u64 2023-09-07 13:03:43 +02:00
3b69233906 let you specify your task id 2023-09-07 11:16:51 +02:00
42 changed files with 1122 additions and 513 deletions

View File

@ -7,7 +7,7 @@ use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::Unchecked;
use meilisearch_types::tasks::{Details, IndexSwap, KindWithContent, Status, Task, TaskId};
use meilisearch_types::InstanceUid;
use roaring::RoaringBitmap;
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
@ -121,11 +121,11 @@ pub enum KindDump {
},
TaskCancelation {
query: String,
tasks: RoaringBitmap,
tasks: RoaringTreemap,
},
TasksDeletion {
query: String,
tasks: RoaringBitmap,
tasks: RoaringTreemap,
},
DumpCreation {
keys: Vec<Key>,

View File

@ -69,7 +69,7 @@ impl CompatV5ToV6 {
}
let task = v6::Task {
uid: task_view.uid,
uid: task_view.uid as u64,
index_uid: task_view.index_uid,
status: match task_view.status {
v5::Status::Enqueued => v6::Status::Enqueued,

View File

@ -32,11 +32,11 @@ use meilisearch_types::milli::update::{
DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Settings as MilliSettings,
};
use meilisearch_types::milli::{self, Filter, BEU32};
use meilisearch_types::milli::{self, Filter, BEU64};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
use roaring::RoaringBitmap;
use roaring::RoaringTreemap;
use time::macros::format_description;
use time::OffsetDateTime;
use uuid::Uuid;
@ -58,7 +58,7 @@ pub(crate) enum Batch {
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
previous_processing_tasks: RoaringTreemap,
},
TaskDeletion(Task),
SnapshotCreation(Vec<Task>),
@ -1065,7 +1065,13 @@ impl IndexScheduler {
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
fn apply_index_swap(
&self,
wtxn: &mut RwTxn,
task_id: TaskId,
lhs: &str,
rhs: &str,
) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes
let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?;
if !index_lhs_exists {
@ -1086,7 +1092,7 @@ impl IndexScheduler {
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs));
self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
self.all_tasks.put(wtxn, &BEU64::new(task_id), &task)?;
}
// 4. remove the task from indexuid = before_name
@ -1389,7 +1395,11 @@ impl IndexScheduler {
/// Delete each given task from all the databases (if it is deleteable).
///
/// Return the number of tasks that were actually deleted.
fn delete_matched_tasks(&self, wtxn: &mut RwTxn, matched_tasks: &RoaringBitmap) -> Result<u64> {
fn delete_matched_tasks(
&self,
wtxn: &mut RwTxn,
matched_tasks: &RoaringTreemap,
) -> Result<u64> {
// 1. Remove from this list the tasks that we are not allowed to delete
let enqueued_tasks = self.get_status(wtxn, Status::Enqueued)?;
let processing_tasks = &self.processing_tasks.read().unwrap().processing.clone();
@ -1404,7 +1414,7 @@ impl IndexScheduler {
let mut affected_indexes = HashSet::new();
let mut affected_statuses = HashSet::new();
let mut affected_kinds = HashSet::new();
let mut affected_canceled_by = RoaringBitmap::new();
let mut affected_canceled_by = RoaringTreemap::new();
for task_id in to_delete_tasks.iter() {
let task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -1441,10 +1451,10 @@ impl IndexScheduler {
}
for task in to_delete_tasks.iter() {
self.all_tasks.delete(wtxn, &BEU32::new(task))?;
self.all_tasks.delete(wtxn, &BEU64::new(task))?;
}
for canceled_by in affected_canceled_by {
let canceled_by = BEU32::new(canceled_by);
let canceled_by = BEU64::new(canceled_by);
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
tasks -= &to_delete_tasks;
if tasks.is_empty() {
@ -1465,9 +1475,9 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
cancel_task_id: TaskId,
matched_tasks: &RoaringBitmap,
matched_tasks: &RoaringTreemap,
previous_started_at: OffsetDateTime,
previous_processing_tasks: &RoaringBitmap,
previous_processing_tasks: &RoaringTreemap,
) -> Result<Vec<Uuid>> {
let now = OffsetDateTime::now_utc();
@ -1492,7 +1502,7 @@ impl IndexScheduler {
task.details = task.details.map(|d| d.to_failed());
self.update_task(wtxn, &task)?;
}
self.canceled_by.put(wtxn, &BEU32::new(cancel_task_id), &tasks_to_cancel)?;
self.canceled_by.put(wtxn, &BEU64::new(cancel_task_id), &tasks_to_cancel)?;
Ok(content_files_to_delete)
}

View File

@ -48,6 +48,8 @@ impl From<DateField> for Code {
pub enum Error {
#[error("{1}")]
WithCustomErrorCode(Code, Box<Self>),
#[error("Received bad task id: {received} should be >= to {expected}.")]
BadTaskId { received: TaskId, expected: TaskId },
#[error("Index `{0}` not found.")]
IndexNotFound(String),
#[error("Index `{0}` already exists.")]
@ -159,6 +161,7 @@ impl Error {
match self {
Error::IndexNotFound(_)
| Error::WithCustomErrorCode(_, _)
| Error::BadTaskId { .. }
| Error::IndexAlreadyExists(_)
| Error::SwapDuplicateIndexFound(_)
| Error::SwapDuplicateIndexesFound(_)
@ -202,6 +205,7 @@ impl ErrorCode for Error {
fn error_code(&self) -> Code {
match self {
Error::WithCustomErrorCode(code, _) => *code,
Error::BadTaskId { .. } => Code::BadRequest,
Error::IndexNotFound(_) => Code::IndexNotFound,
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,

View File

@ -3,9 +3,10 @@ use std::fmt::Write;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::milli::heed_codec::{CboRoaringTreemapCodec, RoaringTreemapCodec};
use meilisearch_types::milli::BEU64;
use meilisearch_types::tasks::{Details, Task};
use roaring::RoaringBitmap;
use roaring::RoaringTreemap;
use crate::index_mapper::IndexMapper;
use crate::{IndexScheduler, Kind, Status, BEI128};
@ -47,7 +48,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let processing_tasks = processing_tasks.read().unwrap().processing.clone();
snap.push_str(&format!("### Autobatching Enabled = {autobatching_enabled}\n"));
snap.push_str("### Processing Tasks:\n");
snap.push_str(&snapshot_bitmap(&processing_tasks));
snap.push_str(&snapshot_treemap(&processing_tasks));
snap.push_str("\n----------------------------------------------------------------------\n");
snap.push_str("### All Tasks:\n");
@ -103,7 +104,7 @@ pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
snap
}
pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
pub fn snapshot_treemap(r: &RoaringTreemap) -> String {
let mut snap = String::new();
snap.push('[');
for x in r {
@ -113,7 +114,7 @@ pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
snap
}
pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Task>>) -> String {
pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU64>, SerdeJson<Task>>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
@ -125,13 +126,13 @@ pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson
pub fn snapshot_date_db(
rtxn: &RoTxn,
db: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
db: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (_timestamp, task_ids) = next.unwrap();
snap.push_str(&format!("[timestamp] {}\n", snapshot_bitmap(&task_ids)));
snap.push_str(&format!("[timestamp] {}\n", snapshot_treemap(&task_ids)));
}
snap
}
@ -216,45 +217,48 @@ fn snapshot_details(d: &Details) -> String {
pub fn snapshot_status(
rtxn: &RoTxn,
db: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
db: Database<SerdeBincode<Status>, RoaringTreemapCodec>,
) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (status, task_ids) = next.unwrap();
writeln!(snap, "{status} {}", snapshot_bitmap(&task_ids)).unwrap();
writeln!(snap, "{status} {}", snapshot_treemap(&task_ids)).unwrap();
}
snap
}
pub fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (kind, task_ids) = next.unwrap();
let kind = serde_json::to_string(&kind).unwrap();
writeln!(snap, "{kind} {}", snapshot_bitmap(&task_ids)).unwrap();
}
snap
}
pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (index, task_ids) = next.unwrap();
writeln!(snap, "{index} {}", snapshot_bitmap(&task_ids)).unwrap();
}
snap
}
pub fn snapshot_canceled_by(
pub fn snapshot_kind(
rtxn: &RoTxn,
db: Database<OwnedType<BEU32>, RoaringBitmapCodec>,
db: Database<SerdeBincode<Kind>, RoaringTreemapCodec>,
) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (kind, task_ids) = next.unwrap();
writeln!(snap, "{kind} {}", snapshot_bitmap(&task_ids)).unwrap();
let kind = serde_json::to_string(&kind).unwrap();
writeln!(snap, "{kind} {}", snapshot_treemap(&task_ids)).unwrap();
}
snap
}
pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringTreemapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (index, task_ids) = next.unwrap();
writeln!(snap, "{index} {}", snapshot_treemap(&task_ids)).unwrap();
}
snap
}
pub fn snapshot_canceled_by(
rtxn: &RoTxn,
db: Database<OwnedType<BEU64>, RoaringTreemapCodec>,
) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (kind, task_ids) = next.unwrap();
writeln!(snap, "{kind} {}", snapshot_treemap(&task_ids)).unwrap();
}
snap
}

File diff suppressed because it is too large Load Diff

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,2,3,]

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: canceled, canceled_by: 3, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(2), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(2), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "beavero", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "wolfo", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000002, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 3, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,2,3,]

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: canceled, canceled_by: 1, details: { received_documents: 1, indexed_documents: Some(0) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(1), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,]

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0]> }}
1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: canceled, canceled_by: 3, details: { primary_key: Some("sheep") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("sheep") }}
2 {uid: 2, status: canceled, canceled_by: 3, details: { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }, kind: IndexSwap { swaps: [IndexSwap { indexes: ("catto", "doggo") }] }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringBitmap<[0, 1, 2]> }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 3, canceled_tasks: Some(0), original_filter: "test_query" }, kind: TaskCancelation { query: "test_query", tasks: RoaringTreemap<[0, 1, 2]> }}
----------------------------------------------------------------------
### Status:
enqueued []

View File

@ -7,8 +7,8 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,]

View File

@ -8,7 +8,7 @@ source: index-scheduler/src/lib.rs
### All Tasks:
0 {uid: 0, status: succeeded, details: { received_documents: 1, indexed_documents: Some(1) }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { matched_tasks: 1, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
2 {uid: 2, status: enqueued, details: { matched_tasks: 1, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,2,]

View File

@ -7,7 +7,7 @@ source: index-scheduler/src/lib.rs
----------------------------------------------------------------------
### All Tasks:
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0]> }}
2 {uid: 2, status: succeeded, details: { matched_tasks: 1, deleted_tasks: Some(1), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0]> }}
----------------------------------------------------------------------
### Status:
enqueued [1,]

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
3 {uid: 3, status: succeeded, details: { matched_tasks: 2, deleted_tasks: Some(0), original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,3,]

View File

@ -9,7 +9,7 @@ source: index-scheduler/src/lib.rs
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000000, documents_count: 1, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 1, indexed_documents: None }, kind: DocumentAdditionOrUpdate { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: 00000000-0000-0000-0000-000000000001, documents_count: 1, allow_index_creation: true }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringBitmap<[0, 1]> }}
3 {uid: 3, status: enqueued, details: { matched_tasks: 2, deleted_tasks: None, original_filter: "test_query" }, kind: TaskDeletion { query: "test_query", tasks: RoaringTreemap<[0, 1]> }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,3,]

View File

@ -41,6 +41,18 @@ source: index-scheduler/src/lib.rs
"taskDeletion": {
"query": "[query]",
"tasks": [
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
58,
48,
0,

View File

@ -21,6 +21,18 @@ source: index-scheduler/src/lib.rs
"taskDeletion": {
"query": "[query]",
"tasks": [
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
58,
48,
0,

View File

@ -106,6 +106,18 @@ source: index-scheduler/src/lib.rs
"taskDeletion": {
"query": "[query]",
"tasks": [
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
58,
48,
0,

View File

@ -61,6 +61,18 @@ source: index-scheduler/src/lib.rs
"taskDeletion": {
"query": "[query]",
"tasks": [
1,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
58,
48,
0,

View File

@ -5,15 +5,16 @@ use std::ops::Bound;
use meilisearch_types::heed::types::{DecodeIgnore, OwnedType};
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use meilisearch_types::milli::heed_codec::CboRoaringTreemapCodec;
use meilisearch_types::milli::BEU64;
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status};
use roaring::{MultiOps, RoaringBitmap};
use roaring::{MultiOps, RoaringTreemap};
use time::OffsetDateTime;
use crate::{Error, IndexScheduler, Result, Task, TaskId, BEI128};
impl IndexScheduler {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringTreemap> {
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
}
@ -26,7 +27,7 @@ impl IndexScheduler {
}
pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result<Option<Task>> {
Ok(self.all_tasks.get(rtxn, &BEU32::new(task_id))?)
Ok(self.all_tasks.get(rtxn, &BEU64::new(task_id))?)
}
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
@ -88,12 +89,12 @@ impl IndexScheduler {
}
}
self.all_tasks.put(wtxn, &BEU32::new(task.uid), task)?;
self.all_tasks.put(wtxn, &BEU64::new(task.uid), task)?;
Ok(())
}
/// Returns the whole set of tasks that belongs to this index.
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringTreemap> {
Ok(self.index_tasks.get(rtxn, index)?.unwrap_or_default())
}
@ -101,7 +102,7 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
index: &str,
f: impl Fn(&mut RoaringBitmap),
f: impl Fn(&mut RoaringTreemap),
) -> Result<()> {
let mut tasks = self.index_tasks(wtxn, index)?;
f(&mut tasks);
@ -114,7 +115,7 @@ impl IndexScheduler {
Ok(())
}
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringTreemap> {
Ok(self.status.get(rtxn, &status)?.unwrap_or_default())
}
@ -122,7 +123,7 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
status: Status,
bitmap: &RoaringBitmap,
bitmap: &RoaringTreemap,
) -> Result<()> {
Ok(self.status.put(wtxn, &status, bitmap)?)
}
@ -131,7 +132,7 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
status: Status,
f: impl Fn(&mut RoaringBitmap),
f: impl Fn(&mut RoaringTreemap),
) -> Result<()> {
let mut tasks = self.get_status(wtxn, status)?;
f(&mut tasks);
@ -140,7 +141,7 @@ impl IndexScheduler {
Ok(())
}
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringTreemap> {
Ok(self.kind.get(rtxn, &kind)?.unwrap_or_default())
}
@ -148,7 +149,7 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
bitmap: &RoaringTreemap,
) -> Result<()> {
Ok(self.kind.put(wtxn, &kind, bitmap)?)
}
@ -157,7 +158,7 @@ impl IndexScheduler {
&self,
wtxn: &mut RwTxn,
kind: Kind,
f: impl Fn(&mut RoaringBitmap),
f: impl Fn(&mut RoaringTreemap),
) -> Result<()> {
let mut tasks = self.get_kind(wtxn, kind)?;
f(&mut tasks);
@ -169,20 +170,20 @@ impl IndexScheduler {
pub(crate) fn insert_task_datetime(
wtxn: &mut RwTxn,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
let timestamp = BEI128::new(time.unix_timestamp_nanos());
let mut task_ids = database.get(wtxn, &timestamp)?.unwrap_or_default();
task_ids.insert(task_id);
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(task_ids))?;
database.put(wtxn, &timestamp, &RoaringTreemap::from_iter(task_ids))?;
Ok(())
}
pub(crate) fn remove_task_datetime(
wtxn: &mut RwTxn,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
@ -192,7 +193,7 @@ pub(crate) fn remove_task_datetime(
if existing.is_empty() {
database.delete(wtxn, &timestamp)?;
} else {
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(existing))?;
database.put(wtxn, &timestamp, &RoaringTreemap::from_iter(existing))?;
}
}
@ -201,8 +202,8 @@ pub(crate) fn remove_task_datetime(
pub(crate) fn keep_tasks_within_datetimes(
rtxn: &RoTxn,
tasks: &mut RoaringBitmap,
database: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
tasks: &mut RoaringTreemap,
database: Database<OwnedType<BEI128>, CboRoaringTreemapCodec>,
after: Option<OffsetDateTime>,
before: Option<OffsetDateTime>,
) -> Result<()> {
@ -212,7 +213,7 @@ pub(crate) fn keep_tasks_within_datetimes(
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
};
let mut collected_task_ids = RoaringBitmap::new();
let mut collected_task_ids = RoaringTreemap::new();
let start = map_bound(start, |b| BEI128::new(b.unix_timestamp_nanos()));
let end = map_bound(end, |b| BEI128::new(b.unix_timestamp_nanos()));
let iter = database.range(rtxn, &(start, end))?;

View File

@ -104,6 +104,7 @@ macro_rules! impl_from_query_param_wrap_original_value_in_error {
}
impl_from_query_param_wrap_original_value_in_error!(usize, DeserrParseIntError);
impl_from_query_param_wrap_original_value_in_error!(u32, DeserrParseIntError);
impl_from_query_param_wrap_original_value_in_error!(u64, DeserrParseIntError);
impl_from_query_param_wrap_original_value_in_error!(bool, DeserrParseBoolError);
impl FromQueryParameter for String {

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use enum_iterator::Sequence;
use milli::update::IndexDocumentsMethod;
use roaring::RoaringBitmap;
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize, Serializer};
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
@ -15,7 +15,7 @@ use crate::keys::Key;
use crate::settings::{Settings, Unchecked};
use crate::InstanceUid;
pub type TaskId = u32;
pub type TaskId = u64;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -127,11 +127,11 @@ pub enum KindWithContent {
},
TaskCancelation {
query: String,
tasks: RoaringBitmap,
tasks: RoaringTreemap,
},
TaskDeletion {
query: String,
tasks: RoaringBitmap,
tasks: RoaringTreemap,
},
DumpCreation {
keys: Vec<Key>,

View File

@ -203,7 +203,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
.name(String::from("register-snapshot-tasks"))
.spawn(move || loop {
thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation, None) {
error!("Error while registering snapshot: {}", e);
}
})

View File

@ -11,7 +11,7 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView;
use crate::routes::{get_task_id, SummarizedTaskView};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
@ -29,8 +29,9 @@ pub async fn create_dump(
keys: auth_controller.list_keys()?,
instance_uid: analytics.instance_uid().cloned(),
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -7,7 +7,7 @@ use bstr::ByteSlice;
use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::Deserr;
use futures::StreamExt;
use index_scheduler::IndexScheduler;
use index_scheduler::{IndexScheduler, TaskId};
use log::debug;
use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
@ -36,7 +36,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::routes::{get_task_id, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::search::parse_filter;
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
@ -129,8 +129,9 @@ pub async fn delete_document(
index_uid: index_uid.to_string(),
documents_ids: vec![document_id],
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@ -277,6 +278,7 @@ pub async fn replace_documents(
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -285,6 +287,7 @@ pub async fn replace_documents(
params.csv_delimiter,
body,
IndexDocumentsMethod::ReplaceDocuments,
uid,
allow_index_creation,
)
.await?;
@ -308,6 +311,7 @@ pub async fn update_documents(
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
@ -316,6 +320,7 @@ pub async fn update_documents(
params.csv_delimiter,
body,
IndexDocumentsMethod::UpdateDocuments,
uid,
allow_index_creation,
)
.await?;
@ -332,6 +337,7 @@ async fn document_addition(
csv_delimiter: Option<u8>,
mut body: Payload,
method: IndexDocumentsMethod,
task_id: Option<TaskId>,
allow_index_creation: bool,
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match (
@ -445,7 +451,7 @@ async fn document_addition(
};
let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id)).await? {
Ok(task) => task,
Err(e) => {
index_scheduler.delete_update_file(uuid)?;
@ -476,8 +482,9 @@ pub async fn delete_documents_batch(
let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -512,8 +519,9 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -529,8 +537,9 @@ pub async fn clear_all_documents(
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -17,7 +17,7 @@ use serde::Serialize;
use serde_json::json;
use time::OffsetDateTime;
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
@ -135,8 +135,9 @@ pub async fn create_index(
);
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
Ok(HttpResponse::Accepted().json(task))
} else {
@ -203,8 +204,9 @@ pub async fn update_index(
primary_key: body.primary_key,
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -213,11 +215,13 @@ pub async fn update_index(
pub async fn delete_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
Ok(HttpResponse::Accepted().json(task))
}

View File

@ -14,7 +14,7 @@ use serde_json::json;
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::routes::SummarizedTaskView;
use crate::routes::{get_task_id, SummarizedTaskView};
#[macro_export]
macro_rules! make_setting_route {
@ -33,7 +33,7 @@ macro_rules! make_setting_route {
use $crate::extractors::authentication::policies::*;
use $crate::extractors::authentication::GuardedData;
use $crate::extractors::sequential_extractor::SeqHandler;
use $crate::routes::SummarizedTaskView;
use $crate::routes::{get_task_id, SummarizedTaskView};
pub async fn delete(
index_scheduler: GuardedData<
@ -41,6 +41,7 @@ macro_rules! make_setting_route {
Data<IndexScheduler>,
>,
index_uid: web::Path<String>,
req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -55,8 +56,9 @@ macro_rules! make_setting_route {
is_deletion: true,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task))
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
.await??
.into();
@ -97,8 +99,9 @@ macro_rules! make_setting_route {
is_deletion: false,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task))
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid))
.await??
.into();
@ -664,8 +667,9 @@ pub async fn update_all(
is_deletion: false,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
@ -687,6 +691,7 @@ pub async fn get_all(
pub async fn delete_all(
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -700,8 +705,9 @@ pub async fn delete_all(
is_deletion: true,
allow_index_creation,
};
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))

View File

@ -5,7 +5,7 @@ use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler;
use log::debug;
use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError;
use meilisearch_types::error::{Code, ResponseError};
use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
use serde::{Deserialize, Serialize};
@ -41,6 +41,34 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/experimental-features").configure(features::configure));
}
pub fn get_task_id(req: &HttpRequest) -> Result<Option<TaskId>, ResponseError> {
let task_id = req
.headers()
.get("TaskId")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("TaskId is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map(|s| {
s.parse::<TaskId>().map_err(|e| {
ResponseError::from_msg(
format!(
"Could not parse the TaskId as a {}: {e}",
std::any::type_name::<TaskId>(),
),
Code::BadRequest,
)
})
})
.transpose()?;
Ok(task_id)
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView {

View File

@ -10,7 +10,7 @@ use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde_json::json;
use super::SummarizedTaskView;
use super::{get_task_id, SummarizedTaskView};
use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*;
@ -61,7 +61,8 @@ pub async fn swap_indexes(
let task = KindWithContent::IndexSwap { swaps };
let task = index_scheduler.register(task)?;
let task: SummarizedTaskView = task.into();
let uid = get_task_id(&req)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid)).await??.into();
Ok(HttpResponse::Accepted().json(task))
}

View File

@ -20,13 +20,13 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task;
use super::SummarizedTaskView;
use super::{get_task_id, SummarizedTaskView};
use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
const DEFAULT_LIMIT: u32 = 20;
const DEFAULT_LIMIT: u64 = 20;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
@ -175,14 +175,14 @@ impl From<Details> for DetailsView {
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
pub struct TasksFilterQuery {
#[deserr(default = Param(DEFAULT_LIMIT), error = DeserrQueryParamError<InvalidTaskLimit>)]
pub limit: Param<u32>,
pub limit: Param<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskFrom>)]
pub from: Option<Param<TaskId>>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
pub uids: OptionStarOrList<u32>,
pub uids: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskCanceledBy>)]
pub canceled_by: OptionStarOrList<u32>,
pub canceled_by: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskTypes>)]
pub types: OptionStarOrList<Kind>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskStatuses>)]
@ -249,9 +249,9 @@ impl TaskDeletionOrCancelationQuery {
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
pub struct TaskDeletionOrCancelationQuery {
#[deserr(default, error = DeserrQueryParamError<InvalidTaskUids>)]
pub uids: OptionStarOrList<u32>,
pub uids: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskCanceledBy>)]
pub canceled_by: OptionStarOrList<u32>,
pub canceled_by: OptionStarOrList<TaskId>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskTypes>)]
pub types: OptionStarOrList<Kind>,
#[deserr(default, error = DeserrQueryParamError<InvalidTaskStatuses>)]
@ -333,7 +333,9 @@ async fn cancel_tasks(
let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
let uid = get_task_id(&req)?;
let task =
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid)).await??;
let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task))
@ -378,7 +380,8 @@ async fn delete_tasks(
let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
let uid = get_task_id(&req)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid)).await??;
let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task))
@ -388,9 +391,9 @@ async fn delete_tasks(
pub struct AllTasks {
results: Vec<TaskView>,
total: u64,
limit: u32,
from: Option<u32>,
next: Option<u32>,
limit: TaskId,
from: Option<TaskId>,
next: Option<TaskId>,
}
async fn get_tasks(

View File

@ -199,3 +199,74 @@ async fn error_create_with_invalid_index_uid() {
}
"###);
}
#[actix_rt::test]
async fn send_task_id() {
let server = Server::new().await;
let app = server.init_web_app().await;
let index = server.index("catto");
let (response, code) = index.create(None).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "catto",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "doggo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "25"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"202 Accepted");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 25,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "girafo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "12"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"400 Bad Request");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response), @r###"
{
"message": "Received bad task id: 12 should be >= to 26.",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
}

View File

@ -20,7 +20,10 @@ pub use self::beu32_str_codec::BEU32StrCodec;
pub use self::field_id_word_count_codec::FieldIdWordCountCodec;
pub use self::fst_set_codec::FstSetCodec;
pub use self::obkv_codec::ObkvCodec;
pub use self::roaring_bitmap::{BoRoaringBitmapCodec, CboRoaringBitmapCodec, RoaringBitmapCodec};
pub use self::roaring_bitmap::{
BoRoaringBitmapCodec, CboRoaringBitmapCodec, CboRoaringTreemapCodec, RoaringBitmapCodec,
RoaringTreemapCodec,
};
pub use self::roaring_bitmap_length::{
BoRoaringBitmapLenCodec, CboRoaringBitmapLenCodec, RoaringBitmapLenCodec,
};

View File

@ -0,0 +1,196 @@
use std::borrow::Cow;
use std::io;
use std::mem::size_of;
use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
use roaring::RoaringTreemap;
use crate::heed_codec::BytesDecodeOwned;
/// This is the limit where using a byteorder became less size efficient
/// than using a direct roaring encoding, it is also the point where we are able
/// to determine the encoding used only by using the array of bytes length.
pub const THRESHOLD: usize = 4;
/// A conditionnal codec that either use the RoaringBitmap
/// or a lighter ByteOrder en/decoding method.
pub struct CboRoaringTreemapCodec;
impl CboRoaringTreemapCodec {
pub fn serialized_size(roaring: &RoaringTreemap) -> usize {
if roaring.len() <= THRESHOLD as u64 {
roaring.len() as usize * size_of::<u64>()
} else {
roaring.serialized_size()
}
}
pub fn serialize_into(roaring: &RoaringTreemap, vec: &mut Vec<u8>) {
if roaring.len() <= THRESHOLD as u64 {
// If the number of items (u32s) to encode is less than or equal to the threshold
// it means that it would weigh the same or less than the RoaringBitmap
// header, so we directly encode them using ByteOrder instead.
for integer in roaring {
vec.write_u64::<NativeEndian>(integer).unwrap();
}
} else {
// Otherwise, we use the classic RoaringBitmapCodec that writes a header.
roaring.serialize_into(vec).unwrap();
}
}
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringTreemap> {
if bytes.len() <= THRESHOLD * size_of::<u64>() {
// If there is threshold or less than threshold integers that can fit into this array
// of bytes it means that we used the ByteOrder codec serializer.
let mut bitmap = RoaringTreemap::new();
while let Ok(integer) = bytes.read_u64::<NativeEndian>() {
bitmap.insert(integer);
}
Ok(bitmap)
} else {
// Otherwise, it means we used the classic RoaringBitmapCodec and
// that the header takes threshold integers.
RoaringTreemap::deserialize_unchecked_from(bytes)
}
}
/// Merge serialized CboRoaringBitmaps in a buffer.
///
/// if the merged values length is under the threshold, values are directly
/// serialized in the buffer else a RoaringBitmap is created from the
/// values and is serialized in the buffer.
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
let mut roaring = RoaringTreemap::new();
let mut vec = Vec::new();
for bytes in slices {
if bytes.len() <= THRESHOLD * size_of::<u64>() {
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u64::<NativeEndian>() {
vec.push(integer);
}
} else {
roaring |= RoaringTreemap::deserialize_unchecked_from(bytes.as_ref())?;
}
}
if roaring.is_empty() {
vec.sort_unstable();
vec.dedup();
if vec.len() <= THRESHOLD {
for integer in vec {
buffer.extend_from_slice(&integer.to_ne_bytes());
}
} else {
// We can unwrap safely because the vector is sorted upper.
let roaring = RoaringTreemap::from_sorted_iter(vec.into_iter()).unwrap();
roaring.serialize_into(buffer)?;
}
} else {
roaring.extend(vec);
roaring.serialize_into(buffer)?;
}
Ok(())
}
}
impl heed::BytesDecode<'_> for CboRoaringTreemapCodec {
type DItem = RoaringTreemap;
fn bytes_decode(bytes: &[u8]) -> Option<Self::DItem> {
Self::deserialize_from(bytes).ok()
}
}
impl BytesDecodeOwned for CboRoaringTreemapCodec {
type DItem = RoaringTreemap;
fn bytes_decode_owned(bytes: &[u8]) -> Option<Self::DItem> {
Self::deserialize_from(bytes).ok()
}
}
impl heed::BytesEncode<'_> for CboRoaringTreemapCodec {
type EItem = RoaringTreemap;
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
let mut vec = Vec::with_capacity(Self::serialized_size(item));
Self::serialize_into(item, &mut vec);
Some(Cow::Owned(vec))
}
}
#[cfg(test)]
mod tests {
use std::iter::FromIterator;
use heed::{BytesDecode, BytesEncode};
use super::*;
#[test]
fn verify_encoding_decoding() {
let input = RoaringTreemap::from_iter(0..THRESHOLD as u64);
let bytes = CboRoaringTreemapCodec::bytes_encode(&input).unwrap();
let output = CboRoaringTreemapCodec::bytes_decode(&bytes).unwrap();
assert_eq!(input, output);
}
#[test]
fn verify_threshold() {
let input = RoaringTreemap::from_iter(0..THRESHOLD as u64);
// use roaring treemap
let mut bytes = Vec::new();
input.serialize_into(&mut bytes).unwrap();
let roaring_size = bytes.len();
// use byteorder directly
let mut bytes = Vec::new();
for integer in input {
bytes.write_u64::<NativeEndian>(integer).unwrap();
}
let bo_size = bytes.len();
assert!(roaring_size > bo_size, "roaring size: {}, bo size {}", roaring_size, bo_size);
}
#[test]
fn merge_cbo_roaring_bitmaps() {
let mut buffer = Vec::new();
let small_data = vec![
RoaringTreemap::from_sorted_iter(1..4).unwrap(),
RoaringTreemap::from_sorted_iter(2..5).unwrap(),
RoaringTreemap::from_sorted_iter(4..6).unwrap(),
RoaringTreemap::from_sorted_iter(1..3).unwrap(),
];
let small_data: Vec<_> =
small_data.iter().map(|b| CboRoaringTreemapCodec::bytes_encode(b).unwrap()).collect();
CboRoaringTreemapCodec::merge_into(small_data.as_slice(), &mut buffer).unwrap();
let bitmap = CboRoaringTreemapCodec::deserialize_from(&buffer).unwrap();
let expected = RoaringTreemap::from_sorted_iter(1..6).unwrap();
assert_eq!(bitmap, expected);
let medium_data = vec![
RoaringTreemap::from_sorted_iter(1..4).unwrap(),
RoaringTreemap::from_sorted_iter(2..5).unwrap(),
RoaringTreemap::from_sorted_iter(4..8).unwrap(),
RoaringTreemap::from_sorted_iter(0..3).unwrap(),
RoaringTreemap::from_sorted_iter(7..23).unwrap(),
];
let medium_data: Vec<_> =
medium_data.iter().map(|b| CboRoaringTreemapCodec::bytes_encode(b).unwrap()).collect();
buffer.clear();
CboRoaringTreemapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap();
let bitmap = CboRoaringTreemapCodec::deserialize_from(&buffer).unwrap();
let expected = RoaringTreemap::from_sorted_iter(0..23).unwrap();
assert_eq!(bitmap, expected);
}
}

View File

@ -1,7 +1,11 @@
mod bo_roaring_bitmap_codec;
pub mod cbo_roaring_bitmap_codec;
pub mod cbo_roaring_treemap_codec;
mod roaring_bitmap_codec;
mod roaring_treemap_codec;
pub use self::bo_roaring_bitmap_codec::BoRoaringBitmapCodec;
pub use self::cbo_roaring_bitmap_codec::CboRoaringBitmapCodec;
pub use self::cbo_roaring_treemap_codec::CboRoaringTreemapCodec;
pub use self::roaring_bitmap_codec::RoaringBitmapCodec;
pub use self::roaring_treemap_codec::RoaringTreemapCodec;

View File

@ -0,0 +1,33 @@
use std::borrow::Cow;
use roaring::RoaringTreemap;
use crate::heed_codec::BytesDecodeOwned;
pub struct RoaringTreemapCodec;
impl heed::BytesDecode<'_> for RoaringTreemapCodec {
type DItem = RoaringTreemap;
fn bytes_decode(bytes: &[u8]) -> Option<Self::DItem> {
RoaringTreemap::deserialize_unchecked_from(bytes).ok()
}
}
impl BytesDecodeOwned for RoaringTreemapCodec {
type DItem = RoaringTreemap;
fn bytes_decode_owned(bytes: &[u8]) -> Option<Self::DItem> {
RoaringTreemap::deserialize_from(bytes).ok()
}
}
impl heed::BytesEncode<'_> for RoaringTreemapCodec {
type EItem = RoaringTreemap;
fn bytes_encode(item: &Self::EItem) -> Option<Cow<[u8]>> {
let mut bytes = Vec::with_capacity(item.serialized_size());
item.serialize_into(&mut bytes).ok()?;
Some(Cow::Owned(bytes))
}
}