mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-12-13 16:07:00 +00:00
Compare commits
4 Commits
proper-def
...
support-aw
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50ddc4668a | ||
|
|
ba0aef0287 | ||
|
|
26e368b116 | ||
|
|
ba95ac0915 |
12
.github/workflows/sdks-tests.yml
vendored
12
.github/workflows/sdks-tests.yml
vendored
@@ -25,14 +25,18 @@ jobs:
|
|||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v5
|
||||||
- name: Define the Docker image we need to use
|
- name: Define the Docker image we need to use
|
||||||
id: define-image
|
id: define-image
|
||||||
|
env:
|
||||||
|
EVENT_NAME: ${{ github.event_name }}
|
||||||
|
DOCKER_IMAGE_INPUT: ${{ github.event.inputs.docker_image }}
|
||||||
run: |
|
run: |
|
||||||
event=${{ github.event_name }}
|
|
||||||
echo "docker-image=nightly" >> $GITHUB_OUTPUT
|
echo "docker-image=nightly" >> $GITHUB_OUTPUT
|
||||||
if [[ $event == 'workflow_dispatch' ]]; then
|
if [[ "$EVENT_NAME" == 'workflow_dispatch' ]]; then
|
||||||
echo "docker-image=${{ github.event.inputs.docker_image }}" >> $GITHUB_OUTPUT
|
echo "docker-image=$DOCKER_IMAGE_INPUT" >> $GITHUB_OUTPUT
|
||||||
fi
|
fi
|
||||||
- name: Docker image is ${{ steps.define-image.outputs.docker-image }}
|
- name: Docker image is ${{ steps.define-image.outputs.docker-image }}
|
||||||
run: echo "Docker image is ${{ steps.define-image.outputs.docker-image }}"
|
env:
|
||||||
|
DOCKER_IMAGE: ${{ steps.define-image.outputs.docker-image }}
|
||||||
|
run: echo "Docker image is $DOCKER_IMAGE"
|
||||||
|
|
||||||
##########
|
##########
|
||||||
## SDKs ##
|
## SDKs ##
|
||||||
|
|||||||
@@ -14,6 +14,34 @@ use crate::{Error, IndexScheduler, Result};
|
|||||||
|
|
||||||
const UPDATE_FILES_DIR_NAME: &str = "update_files";
|
const UPDATE_FILES_DIR_NAME: &str = "update_files";
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, serde::Deserialize)]
|
||||||
|
struct StsCredentials {
|
||||||
|
#[serde(rename = "AccessKeyId")]
|
||||||
|
access_key_id: String,
|
||||||
|
#[serde(rename = "SecretAccessKey")]
|
||||||
|
secret_access_key: String,
|
||||||
|
#[serde(rename = "SessionToken")]
|
||||||
|
session_token: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
struct AssumeRoleWithWebIdentityResult {
|
||||||
|
#[serde(rename = "Credentials")]
|
||||||
|
credentials: StsCredentials,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
struct AssumeRoleWithWebIdentityResponse {
|
||||||
|
#[serde(rename = "AssumeRoleWithWebIdentityResult")]
|
||||||
|
result: AssumeRoleWithWebIdentityResult,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Deserialize)]
|
||||||
|
struct StsResponse {
|
||||||
|
#[serde(rename = "AssumeRoleWithWebIdentityResponse")]
|
||||||
|
response: AssumeRoleWithWebIdentityResponse,
|
||||||
|
}
|
||||||
|
|
||||||
/// # Safety
|
/// # Safety
|
||||||
///
|
///
|
||||||
/// See [`EnvOpenOptions::open`].
|
/// See [`EnvOpenOptions::open`].
|
||||||
@@ -231,6 +259,49 @@ impl IndexScheduler {
|
|||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
async fn assume_role_with_web_identity(
|
||||||
|
role_arn: &str,
|
||||||
|
web_identity_token_file: &str,
|
||||||
|
) -> Result<StsCredentials, anyhow::Error> {
|
||||||
|
let token = fs::read_to_string(web_identity_token_file)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to read web identity token file: {}", e))?;
|
||||||
|
|
||||||
|
let form_data = [
|
||||||
|
("Action", "AssumeRoleWithWebIdentity"),
|
||||||
|
("Version", "2011-06-15"),
|
||||||
|
("RoleArn", role_arn),
|
||||||
|
("RoleSessionName", "meilisearch-snapshot-session"),
|
||||||
|
("WebIdentityToken", &token),
|
||||||
|
("DurationSeconds", "3600"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
let response = client
|
||||||
|
.post("https://sts.amazonaws.com/")
|
||||||
|
.header("Accept", "application/json")
|
||||||
|
.header("Content-Type", "application/x-www-form-urlencoded")
|
||||||
|
.form(&form_data)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to send STS request: {}", e))?;
|
||||||
|
|
||||||
|
let status = response.status();
|
||||||
|
let body = response
|
||||||
|
.text()
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to read STS response body: {}", e))?;
|
||||||
|
|
||||||
|
if !status.is_success() {
|
||||||
|
return Err(anyhow::anyhow!("STS request failed with status {}: {}", status, body));
|
||||||
|
}
|
||||||
|
|
||||||
|
let sts_response: StsResponse = serde_json::from_str(&body)
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to deserialize STS response: {}", e))?;
|
||||||
|
|
||||||
|
Ok(sts_response.response.result.credentials)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
pub(super) async fn process_snapshot_to_s3(
|
pub(super) async fn process_snapshot_to_s3(
|
||||||
&self,
|
&self,
|
||||||
@@ -247,6 +318,8 @@ impl IndexScheduler {
|
|||||||
s3_snapshot_prefix,
|
s3_snapshot_prefix,
|
||||||
s3_access_key,
|
s3_access_key,
|
||||||
s3_secret_key,
|
s3_secret_key,
|
||||||
|
s3_role_arn,
|
||||||
|
s3_web_identity_token_file,
|
||||||
s3_max_in_flight_parts,
|
s3_max_in_flight_parts,
|
||||||
s3_compression_level: level,
|
s3_compression_level: level,
|
||||||
s3_signature_duration,
|
s3_signature_duration,
|
||||||
@@ -262,21 +335,40 @@ impl IndexScheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let (reader, writer) = std::io::pipe()?;
|
let (reader, writer) = std::io::pipe()?;
|
||||||
let uploader_task = tokio::spawn(multipart_stream_to_s3(
|
let uploader_task = tokio::spawn(async move {
|
||||||
s3_bucket_url,
|
let (s3_access_key, s3_secret_key, s3_token) =
|
||||||
s3_bucket_region,
|
if let (Some(role_arn), Some(token_file)) =
|
||||||
s3_bucket_name,
|
(s3_role_arn, s3_web_identity_token_file)
|
||||||
s3_snapshot_prefix,
|
{
|
||||||
s3_access_key,
|
let StsCredentials { access_key_id, secret_access_key, session_token } =
|
||||||
s3_secret_key,
|
Self::assume_role_with_web_identity(&role_arn, &token_file)
|
||||||
s3_max_in_flight_parts,
|
.await
|
||||||
s3_signature_duration,
|
.map_err(Error::Anyhow)?;
|
||||||
s3_multipart_part_size,
|
(access_key_id, secret_access_key, Some(session_token))
|
||||||
must_stop_processing,
|
} else if let (Some(access_key), Some(secret_key)) = (s3_access_key, s3_secret_key) {
|
||||||
retry_backoff,
|
(access_key, secret_key, None)
|
||||||
db_name,
|
} else {
|
||||||
reader,
|
return Err(Error::Anyhow(anyhow::anyhow!("Failed to parse args")))
|
||||||
));
|
};
|
||||||
|
|
||||||
|
multipart_stream_to_s3(
|
||||||
|
s3_bucket_url,
|
||||||
|
s3_bucket_region,
|
||||||
|
s3_bucket_name,
|
||||||
|
s3_snapshot_prefix,
|
||||||
|
s3_access_key,
|
||||||
|
s3_secret_key,
|
||||||
|
s3_token,
|
||||||
|
s3_max_in_flight_parts,
|
||||||
|
s3_signature_duration,
|
||||||
|
s3_multipart_part_size,
|
||||||
|
must_stop_processing,
|
||||||
|
retry_backoff,
|
||||||
|
db_name,
|
||||||
|
reader,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
});
|
||||||
|
|
||||||
let index_scheduler = IndexScheduler::private_clone(self);
|
let index_scheduler = IndexScheduler::private_clone(self);
|
||||||
let builder_task = tokio::task::spawn_blocking(move || {
|
let builder_task = tokio::task::spawn_blocking(move || {
|
||||||
@@ -430,6 +522,7 @@ async fn multipart_stream_to_s3(
|
|||||||
s3_snapshot_prefix: String,
|
s3_snapshot_prefix: String,
|
||||||
s3_access_key: String,
|
s3_access_key: String,
|
||||||
s3_secret_key: String,
|
s3_secret_key: String,
|
||||||
|
s3_token: Option<String>,
|
||||||
s3_max_in_flight_parts: std::num::NonZero<usize>,
|
s3_max_in_flight_parts: std::num::NonZero<usize>,
|
||||||
s3_signature_duration: std::time::Duration,
|
s3_signature_duration: std::time::Duration,
|
||||||
s3_multipart_part_size: u64,
|
s3_multipart_part_size: u64,
|
||||||
@@ -456,7 +549,10 @@ async fn multipart_stream_to_s3(
|
|||||||
s3_bucket_url.parse().map_err(BucketError::ParseError).map_err(Error::S3BucketError)?;
|
s3_bucket_url.parse().map_err(BucketError::ParseError).map_err(Error::S3BucketError)?;
|
||||||
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
|
let bucket = Bucket::new(url, UrlStyle::Path, s3_bucket_name, s3_bucket_region)
|
||||||
.map_err(Error::S3BucketError)?;
|
.map_err(Error::S3BucketError)?;
|
||||||
let credential = Credentials::new(s3_access_key, s3_secret_key);
|
let credential = match s3_token {
|
||||||
|
Some(token) => Credentials::new_with_token(s3_access_key, s3_secret_key, token),
|
||||||
|
None => Credentials::new(s3_access_key, s3_secret_key),
|
||||||
|
};
|
||||||
|
|
||||||
// Note for the future (rust 1.91+): use with_added_extension, it's prettier
|
// Note for the future (rust 1.91+): use with_added_extension, it's prettier
|
||||||
let object_path = s3_snapshot_prefix.join(format!("{db_name}.snapshot"));
|
let object_path = s3_snapshot_prefix.join(format!("{db_name}.snapshot"));
|
||||||
|
|||||||
@@ -85,6 +85,8 @@ const MEILI_S3_BUCKET_NAME: &str = "MEILI_S3_BUCKET_NAME";
|
|||||||
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
const MEILI_S3_SNAPSHOT_PREFIX: &str = "MEILI_S3_SNAPSHOT_PREFIX";
|
||||||
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
const MEILI_S3_ACCESS_KEY: &str = "MEILI_S3_ACCESS_KEY";
|
||||||
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
const MEILI_S3_SECRET_KEY: &str = "MEILI_S3_SECRET_KEY";
|
||||||
|
const MEILI_S3_ROLE_ARN: &str = "MEILI_S3_ROLE_ARN";
|
||||||
|
const MEILI_S3_WEB_IDENTITY_TOKEN_FILE: &str = "MEILI_S3_WEB_IDENTITY_TOKEN_FILE";
|
||||||
const MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS";
|
const MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS: &str = "MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS";
|
||||||
const MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL: &str = "MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL";
|
const MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL: &str = "MEILI_EXPERIMENTAL_S3_COMPRESSION_LEVEL";
|
||||||
const MEILI_EXPERIMENTAL_S3_SIGNATURE_DURATION_SECONDS: &str =
|
const MEILI_EXPERIMENTAL_S3_SIGNATURE_DURATION_SECONDS: &str =
|
||||||
@@ -942,7 +944,8 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
|
|||||||
// This group is a bit tricky but makes it possible to require all listed fields if one of them
|
// This group is a bit tricky but makes it possible to require all listed fields if one of them
|
||||||
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
|
// is specified. It lets us keep an Option for the S3SnapshotOpts configuration.
|
||||||
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
|
// <https://github.com/clap-rs/clap/issues/5092#issuecomment-2616986075>
|
||||||
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix", "s3_access_key", "s3_secret_key"])]
|
#[group(requires_all = ["s3_bucket_url", "s3_bucket_region", "s3_bucket_name", "s3_snapshot_prefix"])]
|
||||||
|
#[group(requires = "s3_auth")]
|
||||||
pub struct S3SnapshotOpts {
|
pub struct S3SnapshotOpts {
|
||||||
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
|
/// The S3 bucket URL in the format https://s3.<region>.amazonaws.com.
|
||||||
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
|
#[clap(long, env = MEILI_S3_BUCKET_URL, required = false)]
|
||||||
@@ -964,15 +967,51 @@ pub struct S3SnapshotOpts {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub s3_snapshot_prefix: String,
|
pub s3_snapshot_prefix: String,
|
||||||
|
|
||||||
/// The S3 access key.
|
/// The S3 access key. Conflicts with --s3-role-arn and --s3-web-identity-token-file.
|
||||||
#[clap(long, env = MEILI_S3_ACCESS_KEY, required = false)]
|
#[clap(
|
||||||
|
long,
|
||||||
|
env = MEILI_S3_ACCESS_KEY,
|
||||||
|
required = false,
|
||||||
|
group = "s3_auth",
|
||||||
|
requires = "s3_secret_key",
|
||||||
|
conflicts_with_all = ["s3_role_arn", "s3_web_identity_token_file"]
|
||||||
|
)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub s3_access_key: String,
|
pub s3_access_key: Option<String>,
|
||||||
|
|
||||||
/// The S3 secret key.
|
/// The S3 secret key. Conflicts with --s3-role-arn and --s3-web-identity-token-file.
|
||||||
#[clap(long, env = MEILI_S3_SECRET_KEY, required = false)]
|
#[clap(
|
||||||
|
long,
|
||||||
|
env = MEILI_S3_SECRET_KEY,
|
||||||
|
required = false,
|
||||||
|
requires = "s3_access_key",
|
||||||
|
conflicts_with_all = ["s3_role_arn", "s3_web_identity_token_file"]
|
||||||
|
)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub s3_secret_key: String,
|
pub s3_secret_key: Option<String>,
|
||||||
|
|
||||||
|
/// The IAM role ARN for web identity federation. Conflicts with --s3-access-key and --s3-secret-key.
|
||||||
|
#[clap(
|
||||||
|
long,
|
||||||
|
env = MEILI_S3_ROLE_ARN,
|
||||||
|
required = false,
|
||||||
|
group = "s3_auth",
|
||||||
|
requires = "s3_web_identity_token_file",
|
||||||
|
conflicts_with_all = ["s3_access_key", "s3_secret_key"]
|
||||||
|
)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub s3_role_arn: Option<String>,
|
||||||
|
|
||||||
|
/// The path to the web identity token file. Conflicts with --s3-access-key and --s3-secret-key.
|
||||||
|
#[clap(
|
||||||
|
long,
|
||||||
|
env = MEILI_S3_WEB_IDENTITY_TOKEN_FILE,
|
||||||
|
required = false,
|
||||||
|
requires = "s3_role_arn",
|
||||||
|
conflicts_with_all = ["s3_access_key", "s3_secret_key"]
|
||||||
|
)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub s3_web_identity_token_file: Option<String>,
|
||||||
|
|
||||||
/// The maximum number of parts that can be uploaded in parallel.
|
/// The maximum number of parts that can be uploaded in parallel.
|
||||||
///
|
///
|
||||||
@@ -1017,6 +1056,8 @@ impl S3SnapshotOpts {
|
|||||||
s3_snapshot_prefix,
|
s3_snapshot_prefix,
|
||||||
s3_access_key,
|
s3_access_key,
|
||||||
s3_secret_key,
|
s3_secret_key,
|
||||||
|
s3_role_arn,
|
||||||
|
s3_web_identity_token_file,
|
||||||
experimental_s3_max_in_flight_parts,
|
experimental_s3_max_in_flight_parts,
|
||||||
experimental_s3_compression_level,
|
experimental_s3_compression_level,
|
||||||
experimental_s3_signature_duration_seconds,
|
experimental_s3_signature_duration_seconds,
|
||||||
@@ -1027,8 +1068,18 @@ impl S3SnapshotOpts {
|
|||||||
export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region);
|
export_to_env_if_not_present(MEILI_S3_BUCKET_REGION, s3_bucket_region);
|
||||||
export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name);
|
export_to_env_if_not_present(MEILI_S3_BUCKET_NAME, s3_bucket_name);
|
||||||
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
|
export_to_env_if_not_present(MEILI_S3_SNAPSHOT_PREFIX, s3_snapshot_prefix);
|
||||||
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, s3_access_key);
|
if let Some(key) = s3_access_key {
|
||||||
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, s3_secret_key);
|
export_to_env_if_not_present(MEILI_S3_ACCESS_KEY, key);
|
||||||
|
}
|
||||||
|
if let Some(key) = s3_secret_key {
|
||||||
|
export_to_env_if_not_present(MEILI_S3_SECRET_KEY, key);
|
||||||
|
}
|
||||||
|
if let Some(arn) = s3_role_arn {
|
||||||
|
export_to_env_if_not_present(MEILI_S3_ROLE_ARN, arn);
|
||||||
|
}
|
||||||
|
if let Some(path) = s3_web_identity_token_file {
|
||||||
|
export_to_env_if_not_present(MEILI_S3_WEB_IDENTITY_TOKEN_FILE, path);
|
||||||
|
}
|
||||||
export_to_env_if_not_present(
|
export_to_env_if_not_present(
|
||||||
MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS,
|
MEILI_EXPERIMENTAL_S3_MAX_IN_FLIGHT_PARTS,
|
||||||
experimental_s3_max_in_flight_parts.to_string(),
|
experimental_s3_max_in_flight_parts.to_string(),
|
||||||
@@ -1059,6 +1110,8 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
|||||||
s3_snapshot_prefix,
|
s3_snapshot_prefix,
|
||||||
s3_access_key,
|
s3_access_key,
|
||||||
s3_secret_key,
|
s3_secret_key,
|
||||||
|
s3_role_arn,
|
||||||
|
s3_web_identity_token_file,
|
||||||
experimental_s3_max_in_flight_parts,
|
experimental_s3_max_in_flight_parts,
|
||||||
experimental_s3_compression_level,
|
experimental_s3_compression_level,
|
||||||
experimental_s3_signature_duration_seconds,
|
experimental_s3_signature_duration_seconds,
|
||||||
@@ -1072,6 +1125,8 @@ impl TryFrom<S3SnapshotOpts> for S3SnapshotOptions {
|
|||||||
s3_snapshot_prefix,
|
s3_snapshot_prefix,
|
||||||
s3_access_key,
|
s3_access_key,
|
||||||
s3_secret_key,
|
s3_secret_key,
|
||||||
|
s3_role_arn,
|
||||||
|
s3_web_identity_token_file,
|
||||||
s3_max_in_flight_parts: experimental_s3_max_in_flight_parts,
|
s3_max_in_flight_parts: experimental_s3_max_in_flight_parts,
|
||||||
s3_compression_level: experimental_s3_compression_level,
|
s3_compression_level: experimental_s3_compression_level,
|
||||||
s3_signature_duration: Duration::from_secs(experimental_s3_signature_duration_seconds),
|
s3_signature_duration: Duration::from_secs(experimental_s3_signature_duration_seconds),
|
||||||
|
|||||||
@@ -47,8 +47,10 @@ pub struct S3SnapshotOptions {
|
|||||||
pub s3_bucket_region: String,
|
pub s3_bucket_region: String,
|
||||||
pub s3_bucket_name: String,
|
pub s3_bucket_name: String,
|
||||||
pub s3_snapshot_prefix: String,
|
pub s3_snapshot_prefix: String,
|
||||||
pub s3_access_key: String,
|
pub s3_access_key: Option<String>,
|
||||||
pub s3_secret_key: String,
|
pub s3_secret_key: Option<String>,
|
||||||
|
pub s3_role_arn: Option<String>,
|
||||||
|
pub s3_web_identity_token_file: Option<String>,
|
||||||
pub s3_max_in_flight_parts: NonZeroUsize,
|
pub s3_max_in_flight_parts: NonZeroUsize,
|
||||||
pub s3_compression_level: u32,
|
pub s3_compression_level: u32,
|
||||||
pub s3_signature_duration: Duration,
|
pub s3_signature_duration: Duration,
|
||||||
|
|||||||
Reference in New Issue
Block a user