Compare commits

..

2 Commits

Author SHA1 Message Date
Kerollmops
0be7db9b42 Move the default pagination limit into a const 2025-12-09 15:24:12 +01:00
Kerollmops
051c084aba Make the Query limit mandatory 2025-12-09 15:21:20 +01:00
10 changed files with 69 additions and 80 deletions

View File

@@ -25,18 +25,14 @@ jobs:
- uses: actions/checkout@v5
- name: Define the Docker image we need to use
id: define-image
env:
EVENT_NAME: ${{ github.event_name }}
DOCKER_IMAGE_INPUT: ${{ github.event.inputs.docker_image }}
run: |
event=${{ github.event_name }}
echo "docker-image=nightly" >> $GITHUB_OUTPUT
if [[ "$EVENT_NAME" == 'workflow_dispatch' ]]; then
echo "docker-image=$DOCKER_IMAGE_INPUT" >> $GITHUB_OUTPUT
if [[ $event == 'workflow_dispatch' ]]; then
echo "docker-image=${{ github.event.inputs.docker_image }}" >> $GITHUB_OUTPUT
fi
- name: Docker image is ${{ steps.define-image.outputs.docker-image }}
env:
DOCKER_IMAGE: ${{ steps.define-image.outputs.docker-image }}
run: echo "Docker image is $DOCKER_IMAGE"
run: echo "Docker image is ${{ steps.define-image.outputs.docker-image }}"
##########
## SDKs ##

12
Cargo.lock generated
View File

@@ -2790,7 +2790,8 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "heed"
version = "0.22.1-nested-rtxns-6"
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn#464a9c33f76e0679115eef7379c9bdde9d5e1eda"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c69e07cd539834bedcfa938f3d7d8520cce1ad2b0776c122b5ccdf8fd5bafe12"
dependencies = [
"bitflags 2.10.0",
"byteorder",
@@ -2807,12 +2808,14 @@ dependencies = [
[[package]]
name = "heed-traits"
version = "0.20.0"
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn#464a9c33f76e0679115eef7379c9bdde9d5e1eda"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff"
[[package]]
name = "heed-types"
version = "0.21.0"
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn#464a9c33f76e0679115eef7379c9bdde9d5e1eda"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d"
dependencies = [
"bincode 1.3.3",
"byteorder",
@@ -3802,7 +3805,8 @@ checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77"
[[package]]
name = "lmdb-master-sys"
version = "0.2.6-nested-rtxns-6"
source = "git+https://github.com/meilisearch/heed?branch=allow-nested-rtxn-from-wtxn#464a9c33f76e0679115eef7379c9bdde9d5e1eda"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e113d9bf240f974fbe7fd516cbfd8c422e925c0655495501c7237548425493d0"
dependencies = [
"cc",
"doxygen-rs",

View File

@@ -52,6 +52,3 @@ opt-level = 3
opt-level = 3
[profile.dev.package.gemm-f16]
opt-level = 3
[patch.crates-io]
heed = { git = "https://github.com/meilisearch/heed", branch = "allow-nested-rtxn-from-wtxn" }

View File

@@ -502,13 +502,11 @@ impl Queue {
*before_finished_at,
)?;
if let Some(limit) = limit {
batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit as usize).collect()
} else {
batches.into_iter().rev().take(*limit as usize).collect()
};
}
batches = if query.reverse.unwrap_or_default() {
batches.into_iter().take(*limit).collect()
} else {
batches.into_iter().rev().take(*limit).collect()
};
Ok(batches)
}
@@ -602,11 +600,8 @@ impl Queue {
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let batches = self.batches.get_existing_batches(
rtxn,
batches.take(query.limit.unwrap_or(u32::MAX) as usize),
processing,
)?;
let batches =
self.batches.get_existing_batches(rtxn, batches.take(query.limit), processing)?;
Ok((batches, total))
}

View File

@@ -28,21 +28,21 @@ fn query_batches_from_and_limit() {
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
let rtxn = index_scheduler.env.read_txn().unwrap();
let query = Query { limit: Some(0), ..Default::default() };
let query = Query { limit: 0, ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[]");
let query = Query { limit: Some(1), ..Default::default() };
let query = Query { limit: 1, ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[2,]");
let query = Query { limit: Some(2), ..Default::default() };
let query = Query { limit: 2, ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
@@ -63,14 +63,14 @@ fn query_batches_from_and_limit() {
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]");
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let query = Query { from: Some(1), limit: 1, ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
.unwrap();
snapshot!(snapshot_bitmap(&batches), @"[1,]");
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let query = Query { from: Some(1), limit: 2, ..Default::default() };
let (batches, _) = index_scheduler
.queue
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)

View File

@@ -31,6 +31,9 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
/// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1;
/// The default limit for pagination
const DEFAULT_LIMIT: usize = 20;
/// Database const names for the `IndexScheduler`.
mod db_name {
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
@@ -40,11 +43,11 @@ mod db_name {
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
/// Each non-null field restricts the set of tasks further.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Query {
/// The maximum number of tasks to be matched
pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
/// The maximum number of tasks to be matched. Defaults to 20.
pub limit: usize,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched. Defaults to 0.
pub from: Option<u32>,
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
pub reverse: Option<bool>,
@@ -83,32 +86,29 @@ pub struct Query {
pub after_finished_at: Option<OffsetDateTime>,
}
impl Query {
/// Return `true` if every field of the query is set to `None`, such that the query
/// matches all tasks.
pub fn is_empty(&self) -> bool {
matches!(
self,
Query {
limit: None,
from: None,
reverse: None,
uids: None,
batch_uids: None,
statuses: None,
types: None,
index_uids: None,
canceled_by: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
after_started_at: None,
before_finished_at: None,
after_finished_at: None,
}
)
impl Default for Query {
fn default() -> Self {
Self {
limit: DEFAULT_LIMIT,
from: Default::default(),
reverse: Default::default(),
uids: Default::default(),
batch_uids: Default::default(),
statuses: Default::default(),
types: Default::default(),
index_uids: Default::default(),
canceled_by: Default::default(),
before_enqueued_at: Default::default(),
after_enqueued_at: Default::default(),
before_started_at: Default::default(),
after_started_at: Default::default(),
before_finished_at: Default::default(),
after_finished_at: Default::default(),
}
}
}
impl Query {
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uids.unwrap_or_default();
@@ -119,7 +119,7 @@ impl Query {
// Removes the `from` and `limit` restrictions from the query.
// Useful to get the total number of tasks matching a filter.
pub fn without_limits(self) -> Self {
Query { limit: None, from: None, ..self }
Query { limit: usize::MAX, from: None, ..self }
}
}

View File

@@ -465,13 +465,11 @@ impl Queue {
*before_finished_at,
)?;
if let Some(limit) = limit {
tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(*limit as usize).collect()
} else {
tasks.into_iter().rev().take(*limit as usize).collect()
};
}
tasks = if query.reverse.unwrap_or_default() {
tasks.into_iter().take(*limit).collect()
} else {
tasks.into_iter().rev().take(*limit).collect()
};
Ok(tasks)
}
@@ -529,9 +527,7 @@ impl Queue {
} else {
Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
};
let tasks = self
.tasks
.get_existing_tasks(rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let tasks = self.tasks.get_existing_tasks(rtxn, tasks.take(query.limit))?;
let ProcessingTasks { batch, processing, progress: _ } = processing_tasks;

View File

@@ -28,21 +28,21 @@ fn query_tasks_from_and_limit() {
let rtxn = index_scheduler.env.read_txn().unwrap();
let processing = index_scheduler.processing_tasks.read().unwrap();
let query = Query { limit: Some(0), ..Default::default() };
let query = Query { limit: 0, ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[]");
let query = Query { limit: Some(1), ..Default::default() };
let query = Query { limit: 1, ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
let query = Query { limit: Some(2), ..Default::default() };
let query = Query { limit: 2, ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
@@ -63,14 +63,14 @@ fn query_tasks_from_and_limit() {
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
let query = Query { from: Some(1), limit: 1, ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
.unwrap();
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
let query = Query { from: Some(1), limit: 2, ..Default::default() };
let (tasks, _) = index_scheduler
.queue
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)

View File

@@ -185,7 +185,7 @@ pub async fn get_metrics(
// Fetch the finished batches...
&Query {
statuses: Some(vec![Status::Succeeded, Status::Failed]),
limit: Some(1),
limit: 1,
..Query::default()
},
auth_filters,
@@ -214,7 +214,7 @@ pub async fn get_metrics(
let task_queue_latency_seconds = index_scheduler
.get_tasks_from_authorized_indexes(
&Query {
limit: Some(1),
limit: 1,
reverse: Some(true),
statuses: Some(vec![Status::Enqueued, Status::Processing]),
..Query::default()

View File

@@ -126,7 +126,7 @@ pub struct TasksFilterQuery {
impl TasksFilterQuery {
pub(crate) fn into_query(self) -> Query {
Query {
limit: Some(self.limit.0),
limit: self.limit.0 as usize,
from: self.from.as_deref().copied(),
reverse: self.reverse.as_deref().copied(),
batch_uids: self.batch_uids.merge_star_and_none(),
@@ -225,7 +225,8 @@ pub struct TaskDeletionOrCancelationQuery {
impl TaskDeletionOrCancelationQuery {
fn into_query(self) -> Query {
Query {
limit: None,
// We want to delete all tasks that match the given filters
limit: usize::MAX,
from: None,
reverse: None,
batch_uids: self.batch_uids.merge_star_and_none(),