mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-12 23:47:00 +00:00
Compare commits
3 Commits
proper-def
...
prototype-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ba0aef0287 | ||
|
|
26e368b116 | ||
|
|
ba95ac0915 |
12
.github/workflows/sdks-tests.yml
vendored
12
.github/workflows/sdks-tests.yml
vendored
@@ -25,14 +25,18 @@ jobs:
|
||||
- uses: actions/checkout@v5
|
||||
- name: Define the Docker image we need to use
|
||||
id: define-image
|
||||
env:
|
||||
EVENT_NAME: ${{ github.event_name }}
|
||||
DOCKER_IMAGE_INPUT: ${{ github.event.inputs.docker_image }}
|
||||
run: |
|
||||
event=${{ github.event_name }}
|
||||
echo "docker-image=nightly" >> $GITHUB_OUTPUT
|
||||
if [[ $event == 'workflow_dispatch' ]]; then
|
||||
echo "docker-image=${{ github.event.inputs.docker_image }}" >> $GITHUB_OUTPUT
|
||||
if [[ "$EVENT_NAME" == 'workflow_dispatch' ]]; then
|
||||
echo "docker-image=$DOCKER_IMAGE_INPUT" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
- name: Docker image is ${{ steps.define-image.outputs.docker-image }}
|
||||
run: echo "Docker image is ${{ steps.define-image.outputs.docker-image }}"
|
||||
env:
|
||||
DOCKER_IMAGE: ${{ steps.define-image.outputs.docker-image }}
|
||||
run: echo "Docker image is $DOCKER_IMAGE"
|
||||
|
||||
##########
|
||||
## SDKs ##
|
||||
|
||||
@@ -502,11 +502,13 @@ impl Queue {
|
||||
*before_finished_at,
|
||||
)?;
|
||||
|
||||
batches = if query.reverse.unwrap_or_default() {
|
||||
batches.into_iter().take(*limit).collect()
|
||||
} else {
|
||||
batches.into_iter().rev().take(*limit).collect()
|
||||
};
|
||||
if let Some(limit) = limit {
|
||||
batches = if query.reverse.unwrap_or_default() {
|
||||
batches.into_iter().take(*limit as usize).collect()
|
||||
} else {
|
||||
batches.into_iter().rev().take(*limit as usize).collect()
|
||||
};
|
||||
}
|
||||
|
||||
Ok(batches)
|
||||
}
|
||||
@@ -600,8 +602,11 @@ impl Queue {
|
||||
Box::new(batches.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
|
||||
};
|
||||
|
||||
let batches =
|
||||
self.batches.get_existing_batches(rtxn, batches.take(query.limit), processing)?;
|
||||
let batches = self.batches.get_existing_batches(
|
||||
rtxn,
|
||||
batches.take(query.limit.unwrap_or(u32::MAX) as usize),
|
||||
processing,
|
||||
)?;
|
||||
|
||||
Ok((batches, total))
|
||||
}
|
||||
|
||||
@@ -28,21 +28,21 @@ fn query_batches_from_and_limit() {
|
||||
|
||||
let proc = index_scheduler.processing_tasks.read().unwrap().clone();
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let query = Query { limit: 0, ..Default::default() };
|
||||
let query = Query { limit: Some(0), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
.queue
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&batches), @"[]");
|
||||
|
||||
let query = Query { limit: 1, ..Default::default() };
|
||||
let query = Query { limit: Some(1), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
.queue
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&batches), @"[2,]");
|
||||
|
||||
let query = Query { limit: 2, ..Default::default() };
|
||||
let query = Query { limit: Some(2), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
.queue
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
|
||||
@@ -63,14 +63,14 @@ fn query_batches_from_and_limit() {
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&batches), @"[0,1,2,]");
|
||||
|
||||
let query = Query { from: Some(1), limit: 1, ..Default::default() };
|
||||
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
.queue
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&batches), @"[1,]");
|
||||
|
||||
let query = Query { from: Some(1), limit: 2, ..Default::default() };
|
||||
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
||||
let (batches, _) = index_scheduler
|
||||
.queue
|
||||
.get_batch_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &proc)
|
||||
|
||||
@@ -31,9 +31,6 @@ use crate::{Error, IndexSchedulerOptions, Result, TaskId};
|
||||
|
||||
/// The number of database used by queue itself
|
||||
const NUMBER_OF_DATABASES: u32 = 1;
|
||||
/// The default limit for pagination
|
||||
const DEFAULT_LIMIT: usize = 20;
|
||||
|
||||
/// Database const names for the `IndexScheduler`.
|
||||
mod db_name {
|
||||
pub const BATCH_TO_TASKS_MAPPING: &str = "batch-to-tasks-mapping";
|
||||
@@ -43,11 +40,11 @@ mod db_name {
|
||||
///
|
||||
/// An empty/default query (where each field is set to `None`) matches all tasks.
|
||||
/// Each non-null field restricts the set of tasks further.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq)]
|
||||
pub struct Query {
|
||||
/// The maximum number of tasks to be matched. Defaults to 20.
|
||||
pub limit: usize,
|
||||
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched. Defaults to 0.
|
||||
/// The maximum number of tasks to be matched
|
||||
pub limit: Option<u32>,
|
||||
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
|
||||
pub from: Option<u32>,
|
||||
/// The order used to return the tasks. By default the newest tasks are returned first and the boolean is `false`.
|
||||
pub reverse: Option<bool>,
|
||||
@@ -86,29 +83,32 @@ pub struct Query {
|
||||
pub after_finished_at: Option<OffsetDateTime>,
|
||||
}
|
||||
|
||||
impl Default for Query {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
limit: DEFAULT_LIMIT,
|
||||
from: Default::default(),
|
||||
reverse: Default::default(),
|
||||
uids: Default::default(),
|
||||
batch_uids: Default::default(),
|
||||
statuses: Default::default(),
|
||||
types: Default::default(),
|
||||
index_uids: Default::default(),
|
||||
canceled_by: Default::default(),
|
||||
before_enqueued_at: Default::default(),
|
||||
after_enqueued_at: Default::default(),
|
||||
before_started_at: Default::default(),
|
||||
after_started_at: Default::default(),
|
||||
before_finished_at: Default::default(),
|
||||
after_finished_at: Default::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Query {
|
||||
/// Return `true` if every field of the query is set to `None`, such that the query
|
||||
/// matches all tasks.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Query {
|
||||
limit: None,
|
||||
from: None,
|
||||
reverse: None,
|
||||
uids: None,
|
||||
batch_uids: None,
|
||||
statuses: None,
|
||||
types: None,
|
||||
index_uids: None,
|
||||
canceled_by: None,
|
||||
before_enqueued_at: None,
|
||||
after_enqueued_at: None,
|
||||
before_started_at: None,
|
||||
after_started_at: None,
|
||||
before_finished_at: None,
|
||||
after_finished_at: None,
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
|
||||
pub fn with_index(self, index_uid: String) -> Self {
|
||||
let mut index_vec = self.index_uids.unwrap_or_default();
|
||||
@@ -119,7 +119,7 @@ impl Query {
|
||||
// Removes the `from` and `limit` restrictions from the query.
|
||||
// Useful to get the total number of tasks matching a filter.
|
||||
pub fn without_limits(self) -> Self {
|
||||
Query { limit: usize::MAX, from: None, ..self }
|
||||
Query { limit: None, from: None, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -465,11 +465,13 @@ impl Queue {
|
||||
*before_finished_at,
|
||||
)?;
|
||||
|
||||
tasks = if query.reverse.unwrap_or_default() {
|
||||
tasks.into_iter().take(*limit).collect()
|
||||
} else {
|
||||
tasks.into_iter().rev().take(*limit).collect()
|
||||
};
|
||||
if let Some(limit) = limit {
|
||||
tasks = if query.reverse.unwrap_or_default() {
|
||||
tasks.into_iter().take(*limit as usize).collect()
|
||||
} else {
|
||||
tasks.into_iter().rev().take(*limit as usize).collect()
|
||||
};
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
@@ -527,7 +529,9 @@ impl Queue {
|
||||
} else {
|
||||
Box::new(tasks.into_iter().rev()) as Box<dyn Iterator<Item = u32>>
|
||||
};
|
||||
let tasks = self.tasks.get_existing_tasks(rtxn, tasks.take(query.limit))?;
|
||||
let tasks = self
|
||||
.tasks
|
||||
.get_existing_tasks(rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
|
||||
|
||||
let ProcessingTasks { batch, processing, progress: _ } = processing_tasks;
|
||||
|
||||
|
||||
@@ -28,21 +28,21 @@ fn query_tasks_from_and_limit() {
|
||||
|
||||
let rtxn = index_scheduler.env.read_txn().unwrap();
|
||||
let processing = index_scheduler.processing_tasks.read().unwrap();
|
||||
let query = Query { limit: 0, ..Default::default() };
|
||||
let query = Query { limit: Some(0), ..Default::default() };
|
||||
let (tasks, _) = index_scheduler
|
||||
.queue
|
||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[]");
|
||||
|
||||
let query = Query { limit: 1, ..Default::default() };
|
||||
let query = Query { limit: Some(1), ..Default::default() };
|
||||
let (tasks, _) = index_scheduler
|
||||
.queue
|
||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[2,]");
|
||||
|
||||
let query = Query { limit: 2, ..Default::default() };
|
||||
let query = Query { limit: Some(2), ..Default::default() };
|
||||
let (tasks, _) = index_scheduler
|
||||
.queue
|
||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
|
||||
@@ -63,14 +63,14 @@ fn query_tasks_from_and_limit() {
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[0,1,2,]");
|
||||
|
||||
let query = Query { from: Some(1), limit: 1, ..Default::default() };
|
||||
let query = Query { from: Some(1), limit: Some(1), ..Default::default() };
|
||||
let (tasks, _) = index_scheduler
|
||||
.queue
|
||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
|
||||
.unwrap();
|
||||
snapshot!(snapshot_bitmap(&tasks), @"[1,]");
|
||||
|
||||
let query = Query { from: Some(1), limit: 2, ..Default::default() };
|
||||
let query = Query { from: Some(1), limit: Some(2), ..Default::default() };
|
||||
let (tasks, _) = index_scheduler
|
||||
.queue
|
||||
.get_task_ids_from_authorized_indexes(&rtxn, &query, &AuthFilter::default(), &processing)
|
||||
|
||||
@@ -14,6 +14,34 @@ use crate::{Error, IndexScheduler, Result};
|
||||
|
||||
const UPDATE_FILES_DIR_NAME: &str = "update_files";
|
||||
|
||||
#[derive(Debug, Clone, serde::Deserialize)]
|
||||
struct StsCredentials {
|
||||
#[serde(rename = "AccessKeyId")]
|
||||
access_key_id: String,
|
||||
#[serde(rename = "SecretAccessKey")]
|
||||
secret_access_key: String,
|
||||
#[serde(rename = "SessionToken")]
|
||||
session_token: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct AssumeRoleWithWebIdentityResult {
|
||||
#[serde(rename = "Credentials")]
|
||||
credentials: StsCredentials,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct AssumeRoleWithWebIdentityResponse {
|
||||
#[serde(rename = "AssumeRoleWithWebIdentityResult")]
|
||||
result: AssumeRoleWithWebIdentityResult,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Deserialize)]
|
||||
struct StsResponse {
|
||||
#[serde(rename = "AssumeRoleWithWebIdentityResponse")]
|
||||
response: AssumeRoleWithWebIdentityResponse,
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
///
|
||||
/// See [`EnvOpenOptions::open`].
|
||||
@@ -231,6 +259,49 @@ impl IndexScheduler {
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn assume_role_with_web_identity(
|
||||
role_arn: &str,
|
||||
web_identity_token_file: &str,
|
||||
) -> Result<StsCredentials, anyhow::Error> {
|
||||
let token = fs::read_to_string(web_identity_token_file)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read web identity token file: {}", e))?;
|
||||
|
||||
let form_data = [
|
||||
("Action", "AssumeRoleWithWebIdentity"),
|
||||
("Version", "2011-06-15"),
|
||||
("RoleArn", role_arn),
|
||||
("RoleSessionName", "meilisearch-snapshot-session"),
|
||||
("WebIdentityToken", &token),
|
||||
("DurationSeconds", "3600"),
|
||||
];
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.post("https://sts.amazonaws.com/")
|
||||
.header("Accept", "application/json")
|
||||
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||
.form(&form_data)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to send STS request: {}", e))?;
|
||||
|
||||
let status = response.status();
|
||||
let body = response
|
||||
.text()
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Failed to read STS response body: {}", e))?;
|
||||
|
||||
if !status.is_success() {
|
||||
return Err(anyhow::anyhow!("STS request failed with status {}: {}", status, body));
|
||||
}
|
||||
|
||||
let sts_response: StsResponse = serde_json::from_str(&body)
|
||||
.map_err(|e| anyhow::anyhow!("Failed to deserialize STS response: {}", e))?;
|
||||
|
||||
Ok(sts_response.response.result.credentials)
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(super) async fn process_snapshot_to_s3(
|
||||
&self,
|
||||
@@ -247,6 +318,8 @@ impl IndexScheduler {
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_role_arn,
|
||||
s3_web_identity_token_file,
|
||||
s3_max_in_flight_parts,
|
||||
s3_compression_level: level,
|
||||
s3_signature_duration,
|
||||
@@ -262,21 +335,37 @@ impl IndexScheduler {
|
||||
};
|
||||
|
||||
let (reader, writer) = std::io::pipe()?;
|
||||
let uploader_task = tokio::spawn(multipart_stream_to_s3(
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_max_in_flight_parts,
|
||||
s3_signature_duration,
|
||||
s3_multipart_part_size,
|
||||
must_stop_processing,
|
||||
retry_backoff,
|
||||
db_name,
|
||||
reader,
|
||||
));
|
||||
let uploader_task = tokio::spawn(async move {
|
||||
let (s3_access_key, s3_secret_key, s3_token) = if !s3_role_arn.is_empty()
|
||||
&& !s3_web_identity_token_file.is_empty()
|
||||
{
|
||||
let StsCredentials { access_key_id, secret_access_key, session_token } =
|
||||
Self::assume_role_with_web_identity(&s3_role_arn, &s3_web_identity_token_file)
|
||||
.await
|
||||
.map_err(Error::Anyhow)?;
|
||||
(access_key_id, secret_access_key, Some(session_token))
|
||||
} else {
|
||||
(s3_access_key, s3_secret_key, None)
|
||||
};
|
||||
|
||||
multipart_stream_to_s3(
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
s3_bucket_name,
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_token,
|
||||
s3_max_in_flight_parts,
|
||||
s3_signature_duration,
|
||||
s3_multipart_part_size,
|
||||
must_stop_processing,
|
||||
retry_backoff,
|
||||
db_name,
|
||||
reader,
|
||||
)
|
||||
.await
|
||||
});
|
||||
|
||||
let index_scheduler = IndexScheduler::private_clone(self);
|
||||
let builder_task = tokio::task::spawn_blocking(move || {
|
||||
@@ -430,6 +519,7 @@ async fn multipart_stream_to_s3(
|
||||
s3_snapshot_prefix: String,
|
||||
s3_access_key: String,
|
||||
s3_secret_key: String,
|
||||
s3_token: Option<String>,
|
||||
s3_max_in_flight_parts: std::num::NonZero<usize>,
|
||||
s3_signature_duration: std::time::Duration,
|
||||
s3_multipart_part_size: u64,
|
||||
@@ -456,7 +546,10 @@ async fn multipart_stream_to_s3(
|
||||
s3_bucket_url.parse().map_err(BucketError::ParseError).map_err(Error::S3BucketError)?;
|
||||
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
|
||||
.map_err(Error::S3BucketError)?;
|
||||
let credential = Credentials::new(s3_access_key, s3_secret_key);
|
||||
let credential = match s3_token {
|
||||
Some(token) => Credentials::new_with_token(s3_access_key, s3_secret_key, token),
|
||||
None => Credentials::new(s3_access_key, s3_secret_key),
|
||||
};
|
||||
|
||||
// Note for the future (rust 1.91+): use with_added_extension, it's prettier
|
||||
let object_path = s3_snapshot_prefix.join(format!("{db_name}.snapshot"));
|
||||
|
||||
@@ -85,6 +85,8 @@ const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
||||
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||
const MEILI_S3_ROLE_ARN: &str = "MEILI_S3_ROLE_ARN";
|
||||
const MEILI_S3_WEB_IDENTITY_TOKEN_FILE: &str = "MEILI_S3_WEB_IDENTITY_TOKEN_FILE";
|
||||
const MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS";
|
||||
const MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL: &str = "MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL";
|
||||
const MEILI_EXPERIMENTAL_S3_SIGNATURE_DURATION_SECONDS: &str =
|
||||
@@ -942,7 +944,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
||||
// This group is a bit tricky but makes it possible to require all listed fields if one of them
|
||||
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
|
||||
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
|
||||
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])]
|
||||
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix"])]
|
||||
pub struct S3SnapshotOpts {
|
||||
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
|
||||
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
|
||||
@@ -974,6 +976,16 @@ pub struct S3SnapshotOpts {
|
||||
#[serde(default)]
|
||||
pub s3_secret_key: String,
|
||||
|
||||
/// The IAM role ARN for web identity authentication.
|
||||
#[clap(long, env = MEILI_S3_ROLE_ARN, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_role_arn: String,
|
||||
|
||||
/// The path to the web identity token file for IAM role authentication.
|
||||
#[clap(long, env = MEILI_S3_WEB_IDENTITY_TOKEN_FILE, required = false)]
|
||||
#[serde(default)]
|
||||
pub s3_web_identity_token_file: String,
|
||||
|
||||
/// The maximum number of parts that can be uploaded in parallel.
|
||||
///
|
||||
/// For more information, see <https://github.com/orgs/meilisearch/discussions/869>.
|
||||
@@ -1017,6 +1029,8 @@ impl S3SnapshotOpts {
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_role_arn,
|
||||
s3_web_identity_token_file,
|
||||
experimental_s3_max_in_flight_parts,
|
||||
experimental_s3_compression_level,
|
||||
experimental_s3_signature_duration_seconds,
|
||||
@@ -1029,6 +1043,8 @@ impl S3SnapshotOpts {
|
||||
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
|
||||
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
|
||||
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
|
||||
export_to_env_if_not_present(MEILI_S3_ROLE_ARN, s3_role_arn);
|
||||
export_to_env_if_not_present(MEILI_S3_WEB_IDENTITY_TOKEN_FILE, s3_web_identity_token_file);
|
||||
export_to_env_if_not_present(
|
||||
MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS,
|
||||
experimental_s3_max_in_flight_parts.to_string(),
|
||||
@@ -1059,12 +1075,24 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_role_arn,
|
||||
s3_web_identity_token_file,
|
||||
experimental_s3_max_in_flight_parts,
|
||||
experimental_s3_compression_level,
|
||||
experimental_s3_signature_duration_seconds,
|
||||
experimental_s3_multipart_part_size,
|
||||
} = other;
|
||||
|
||||
let has_access_key_auth = !s3_access_key.is_empty() && !s3_secret_key.is_empty();
|
||||
let has_role_arn_auth = !s3_role_arn.is_empty() && !s3_web_identity_token_file.is_empty();
|
||||
|
||||
// Specify either one of the authentication options, but not both
|
||||
if has_access_key_auth && has_role_arn_auth {
|
||||
anyhow::bail!(
|
||||
"Please use either MEILI_S3_ACCESS_KEY and MEILI_S3_SECRET_KEY, or MEILI_S3_ROLE_ARN and MEILI_S3_WEB_IDENTITY_TOKEN_FILE."
|
||||
);
|
||||
}
|
||||
|
||||
Ok(S3SnapshotOptions {
|
||||
s3_bucket_url,
|
||||
s3_bucket_region,
|
||||
@@ -1072,6 +1100,8 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
||||
s3_snapshot_prefix,
|
||||
s3_access_key,
|
||||
s3_secret_key,
|
||||
s3_role_arn,
|
||||
s3_web_identity_token_file,
|
||||
s3_max_in_flight_parts: experimental_s3_max_in_flight_parts,
|
||||
s3_compression_level: experimental_s3_compression_level,
|
||||
s3_signature_duration: Duration::from_secs(experimental_s3_signature_duration_seconds),
|
||||
|
||||
@@ -185,7 +185,7 @@ pub async fn get_metrics(
|
||||
// Fetch the finished batches...
|
||||
&Query {
|
||||
statuses: Some(vec![Status::Succeeded, Status::Failed]),
|
||||
limit: 1,
|
||||
limit: Some(1),
|
||||
..Query::default()
|
||||
},
|
||||
auth_filters,
|
||||
@@ -214,7 +214,7 @@ pub async fn get_metrics(
|
||||
let task_queue_latency_seconds = index_scheduler
|
||||
.get_tasks_from_authorized_indexes(
|
||||
&Query {
|
||||
limit: 1,
|
||||
limit: Some(1),
|
||||
reverse: Some(true),
|
||||
statuses: Some(vec![Status::Enqueued, Status::Processing]),
|
||||
..Query::default()
|
||||
|
||||
@@ -126,7 +126,7 @@ pub struct TasksFilterQuery {
|
||||
impl TasksFilterQuery {
|
||||
pub(crate) fn into_query(self) -> Query {
|
||||
Query {
|
||||
limit: self.limit.0 as usize,
|
||||
limit: Some(self.limit.0),
|
||||
from: self.from.as_deref().copied(),
|
||||
reverse: self.reverse.as_deref().copied(),
|
||||
batch_uids: self.batch_uids.merge_star_and_none(),
|
||||
@@ -225,8 +225,7 @@ pub struct TaskDeletionOrCancelationQuery {
|
||||
impl TaskDeletionOrCancelationQuery {
|
||||
fn into_query(self) -> Query {
|
||||
Query {
|
||||
// We want to delete all tasks that match the given filters
|
||||
limit: usize::MAX,
|
||||
limit: None,
|
||||
from: None,
|
||||
reverse: None,
|
||||
batch_uids: self.batch_uids.merge_star_and_none(),
|
||||
|
||||
@@ -49,6 +49,8 @@ pub struct S3SnapshotOptions {
|
||||
pub s3_snapshot_prefix: String,
|
||||
pub s3_access_key: String,
|
||||
pub s3_secret_key: String,
|
||||
pub s3_role_arn: String,
|
||||
pub s3_web_identity_token_file: String,
|
||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||
pub s3_compression_level: u32,
|
||||
pub s3_signature_duration: Duration,
|
||||
|
||||
Reference in New Issue
Block a user