Compare commits

..

21 Commits

Author SHA1 Message Date
Louis Dureuil
119bb805e5 camel case the fields in "origin" 2025-07-31 17:32:45 +02:00
Louis Dureuil
9f10011525 Rename Body::with_file 2025-07-30 17:33:01 +02:00
Louis Dureuil
54a2029b1f Adjust timeouts 2025-07-30 17:32:54 +02:00
Louis Dureuil
4fe09f406c Don't always hardcode Content-Type in proxy 2025-07-29 17:52:48 +02:00
Louis Dureuil
9d95f8187d Update snap 2025-07-29 16:28:46 +02:00
Louis Dureuil
fb0904b893 Misc churn 2025-07-29 14:47:49 +02:00
Louis Dureuil
eb829e93c5 Move meilisearch_types::Network to its own module 2025-07-29 14:47:49 +02:00
Louis Dureuil
be36afe9e6 Make types Serialize and Deserialize for proxying 2025-07-29 14:47:49 +02:00
Louis Dureuil
27c087a4fb New errors 2025-07-29 14:47:49 +02:00
Louis Dureuil
c618877bce Dependency changes 2025-07-29 14:47:49 +02:00
Louis Dureuil
982e554639 IndexScheduler::update_task now merges the task.network and accepts &mut Task 2025-07-29 14:47:49 +02:00
Louis Dureuil
a8cfec19b5 IndexScheduler::set_task_network 2025-07-29 14:47:48 +02:00
Louis Dureuil
5d2b1c2637 file-store: persist returns the persisted File object 2025-07-29 14:47:48 +02:00
Louis Dureuil
65fefc7122 Dump support for network 2025-07-29 14:47:48 +02:00
Louis Dureuil
180cfd71cc Proxy all document tasks to the network when sharding is enabled 2025-07-29 14:47:48 +02:00
Louis Dureuil
614cd8cc1c Shard documents 2025-07-29 14:47:48 +02:00
Louis Dureuil
c6f6808981 network: add sharding to Network and writeApiKey to Remotes 2025-07-29 14:47:48 +02:00
Louis Dureuil
9cb77fd310 Add proxy module to proxy requests to members of a network 2025-07-29 14:47:48 +02:00
Louis Dureuil
fa8bd6430c Add new milli::update:🆕:indexer::sharding module 2025-07-29 14:47:48 +02:00
Louis Dureuil
4c016432e1 Add network to Task and TaskView 2025-07-29 14:47:48 +02:00
Louis Dureuil
0aa1f63061 Add EE license 2025-07-29 11:54:12 +02:00
141 changed files with 2201 additions and 3136 deletions

View File

@@ -1,26 +1,28 @@
---
name: New feature issue
about: ⚠️ Should only be used by the internal Meili team ⚠️
name: New sprint issue
about: ⚠️ Should only be used by the engine team ⚠️
title: ''
labels: 'impacts docs, impacts integrations'
labels: 'missing usage in PRD, impacts docs'
assignees: ''
---
Related product team resources: [PRD]() (_internal only_)
Related product discussion:
## Motivation
<!---Copy/paste the information in PRD or briefly detail the product motivation. Ask product team if any hesitation.-->
## Usage
<!---Link to the public part of the PRD, or to the related product discussion for experimental features-->
TBD
## TODO
<!---If necessary, create a list with technical/product steps-->
### Are you modifying a database?
- [ ] If not, add the `no db change` label to your PR, and you're good to merge.
- [ ] If yes, add the `db change` label to your PR. You'll receive a message explaining you what to do.
@@ -52,5 +54,5 @@ TBD
## Impacted teams
<!---Ping the related teams. Ask on Slack if any hesitation-->
<!---@meilisearch/docs-team and @meilisearch/integration-team when there is any API change, e.g. settings addition-->
<!---Ping the related teams. Ask for the engine manager if any hesitation-->
<!---@meilisearch/docs-team when there is any API change, e.g. settings addition-->

View File

@@ -1,16 +0,0 @@
## Related issue
Fixes #...
## Requirements
⚠️ Ensure the following requirements before merging ⚠️
- [ ] Automated tests have been added.
- [ ] If some tests cannot be automated, manual rigorous tests should be applied.
- [ ] ⚠️ If there is any change in the DB:
- [ ] Test that any impacted DB still works as expected after using `--experimental-dumpless-upgrade` on a DB created with the last released Meilisearch
- [ ] Test that during the upgrade, **search is still available** (artificially make the upgrade longer if needed)
- [ ] Set the `db change` label.
- [ ] If necessary, the feature have been tested in the Cloud production environment (with [prototypes](./documentation/prototypes.md)) and the Cloud UI is ready.
- [ ] If necessary, the [documentation](https://github.com/meilisearch/documentation) related to the implemented feature in the PR is ready.
- [ ] If necessary, the [integrations](https://github.com/meilisearch/integration-guides) related to the implemented feature in the PR are ready.

View File

@@ -1,33 +0,0 @@
name-template: 'v$RESOLVED_VERSION'
tag-template: 'v$RESOLVED_VERSION'
exclude-labels:
- 'skip changelog'
version-resolver:
minor:
labels:
- 'enhancement'
default: patch
categories:
- title: '⚠️ Breaking changes'
label: 'breaking-change'
- title: '🚀 Enhancements'
label: 'enhancement'
- title: '🐛 Bug Fixes'
label: 'bug'
- title: '🔒 Security'
label: 'security'
- title: '⚙️ Maintenance/misc'
label:
- 'maintenance'
- 'documentation'
template: |
$CHANGES
❤️ Huge thanks to our contributors: $CONTRIBUTORS.
no-changes-template: 'Changes are coming soon 😎'
sort-direction: 'ascending'
replacers:
- search: '/(?:and )?@dependabot-preview(?:\[bot\])?,?/g'
replace: ''
- search: '/(?:and )?@dependabot(?:\[bot\])?,?/g'
replace: ''

View File

@@ -1,22 +0,0 @@
This issue is about updating Meilisearch dependencies:
- [ ] Update Meilisearch dependencies with the help of `cargo +nightly udeps --all-targets` (remove unused dependencies) and `cargo upgrade` (upgrade dependencies versions) - ⚠️ Some repositories may contain subdirectories (like heed, charabia, or deserr). Take care of updating these in the main crate as well. This won't be done automatically by `cargo upgrade`.
- [ ] [deserr](https://github.com/meilisearch/deserr)
- [ ] [charabia](https://github.com/meilisearch/charabia/)
- [ ] [heed](https://github.com/meilisearch/heed/)
- [ ] [roaring-rs](https://github.com/RoaringBitmap/roaring-rs/)
- [ ] [obkv](https://github.com/meilisearch/obkv)
- [ ] [grenad](https://github.com/meilisearch/grenad/)
- [ ] [arroy](https://github.com/meilisearch/arroy/)
- [ ] [segment](https://github.com/meilisearch/segment)
- [ ] [bumparaw-collections](https://github.com/meilisearch/bumparaw-collections)
- [ ] [bbqueue](https://github.com/meilisearch/bbqueue)
- [ ] Finally, [Meilisearch](https://github.com/meilisearch/MeiliSearch)
- [ ] If new Rust versions have been released, update the minimal Rust version in use at Meilisearch:
- [ ] in this [GitHub Action file](https://github.com/meilisearch/meilisearch/blob/main/.github/workflows/test-suite.yml), by changing the `toolchain` field of the `rustfmt` job to the latest available nightly (of the day before or the current day).
- [ ] in every [GitHub Action files](https://github.com/meilisearch/meilisearch/blob/main/.github/workflows), by changing all the `dtolnay/rust-toolchain@` references to use the latest stable version.
- [ ] in this [`rust-toolchain.toml`](https://github.com/meilisearch/meilisearch/blob/main/rust-toolchain.toml), by changing the `channel` field to the latest stable version.
- [ ] in the [Dockerfile](https://github.com/meilisearch/meilisearch/blob/main/Dockerfile), by changing the base image to `rust:<target_rust_version>-alpine<alpine_version>`. Check that the image exists on [Dockerhub](https://hub.docker.com/_/rust/tags?page=1&name=alpine). Also, build and run the image to check everything still works!
⚠️ This issue should be prioritized to avoid any deprecation and vulnerability issues.
The GitHub action dependencies are managed by [Dependabot](https://github.com/meilisearch/meilisearch/blob/main/.github/dependabot.yml), so no need to update them when solving this issue.

View File

@@ -0,0 +1,100 @@
name: PR Milestone Check
on:
pull_request:
types: [opened, reopened, edited, synchronize, milestoned, demilestoned]
branches:
- "main"
- "release-v*.*.*"
jobs:
check-milestone:
name: Check PR Milestone
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Validate PR milestone
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
// Get PR number directly from the event payload
const prNumber = context.payload.pull_request.number;
// Get PR details
const { data: prData } = await github.rest.pulls.get({
owner: 'meilisearch',
repo: 'meilisearch',
pull_number: prNumber
});
// Get base branch name
const baseBranch = prData.base.ref;
console.log(`Base branch: ${baseBranch}`);
// Get PR milestone
const prMilestone = prData.milestone;
if (!prMilestone) {
core.setFailed('PR must have a milestone assigned');
return;
}
console.log(`PR milestone: ${prMilestone.title}`);
// Validate milestone format: vx.y.z
const milestoneRegex = /^v\d+\.\d+\.\d+$/;
if (!milestoneRegex.test(prMilestone.title)) {
core.setFailed(`Milestone "${prMilestone.title}" does not follow the required format vx.y.z`);
return;
}
// For main branch PRs, check if the milestone is the highest one
if (baseBranch === 'main') {
// Get all milestones
const { data: milestones } = await github.rest.issues.listMilestones({
owner: 'meilisearch',
repo: 'meilisearch',
state: 'open',
sort: 'due_on',
direction: 'desc'
});
// Sort milestones by version number (vx.y.z)
const sortedMilestones = milestones
.filter(m => milestoneRegex.test(m.title))
.sort((a, b) => {
const versionA = a.title.substring(1).split('.').map(Number);
const versionB = b.title.substring(1).split('.').map(Number);
// Compare major version
if (versionA[0] !== versionB[0]) return versionB[0] - versionA[0];
// Compare minor version
if (versionA[1] !== versionB[1]) return versionB[1] - versionA[1];
// Compare patch version
return versionB[2] - versionA[2];
});
if (sortedMilestones.length === 0) {
core.setFailed('No valid milestones found in the repository. Please create at least one milestone with the format vx.y.z');
return;
}
const highestMilestone = sortedMilestones[0];
console.log(`Highest milestone: ${highestMilestone.title}`);
if (prMilestone.title !== highestMilestone.title) {
core.setFailed(`PRs targeting the main branch must use the highest milestone (${highestMilestone.title}), but this PR uses ${prMilestone.title}`);
return;
}
} else {
// For release branches, the milestone should match the branch version
const branchVersion = baseBranch.substring(8); // remove 'release-'
if (prMilestone.title !== branchVersion) {
core.setFailed(`PRs targeting release branch "${baseBranch}" must use the matching milestone "${branchVersion}", but this PR uses "${prMilestone.title}"`);
return;
}
}
console.log('PR milestone validation passed!');

View File

@@ -15,7 +15,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/meilisearch/main/.github/templates/dependency-issue.md > $ISSUE_TEMPLATE
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/dependency-issue.md > $ISSUE_TEMPLATE
- name: Create issue
run: |
gh issue create \

View File

@@ -3,7 +3,7 @@ name: Look for flaky tests
on:
workflow_dispatch:
schedule:
- cron: '0 4 * * *' # Every day at 4:00AM
- cron: "0 12 * * FRI" # Every Friday at 12:00PM
jobs:
flaky:

224
.github/workflows/milestone-workflow.yml vendored Normal file
View File

@@ -0,0 +1,224 @@
name: Milestone's workflow
# /!\ No git flow are handled here
# For each Milestone created (not opened!), and if the release is NOT a patch release (only the patch changed)
# - the roadmap issue is created, see https://github.com/meilisearch/engine-team/blob/main/issue-templates/roadmap-issue.md
# - the changelog issue is created, see https://github.com/meilisearch/engine-team/blob/main/issue-templates/changelog-issue.md
# - update the ruleset to add the current release version to the list of allowed versions and be able to use the merge queue.
# For each Milestone closed
# - the `release_version` label is created
# - this label is applied to all issues/PRs in the Milestone
on:
milestone:
types: [created, closed]
env:
MILESTONE_VERSION: ${{ github.event.milestone.title }}
MILESTONE_URL: ${{ github.event.milestone.html_url }}
MILESTONE_DUE_ON: ${{ github.event.milestone.due_on }}
GH_TOKEN: ${{ secrets.MEILI_BOT_GH_PAT }}
jobs:
# -----------------
# MILESTONE CREATED
# -----------------
get-release-version:
if: github.event.action == 'created'
runs-on: ubuntu-latest
outputs:
is-patch: ${{ steps.check-patch.outputs.is-patch }}
steps:
- uses: actions/checkout@v3
- name: Check if this release is a patch release only
id: check-patch
run: |
echo version: $MILESTONE_VERSION
if [[ $MILESTONE_VERSION =~ ^v[0-9]+\.[0-9]+\.0$ ]]; then
echo 'This is NOT a patch release'
echo "is-patch=false" >> $GITHUB_OUTPUT
elif [[ $MILESTONE_VERSION =~ ^v[0-9]+\.[0-9]+\.[0-9]+$ ]]; then
echo 'This is a patch release'
echo "is-patch=true" >> $GITHUB_OUTPUT
else
echo "Not a valid format of release, check the Milestone's title."
echo 'Should be vX.Y.Z'
exit 1
fi
create-roadmap-issue:
needs: get-release-version
# Create the roadmap issue if the release is not only a patch release
if: github.event.action == 'created' && needs.get-release-version.outputs.is-patch == 'false'
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/roadmap-issue.md > $ISSUE_TEMPLATE
- name: Replace all empty occurrences in the templates
run: |
# Replace all <<version>> occurrences
sed -i "s/<<version>>/$MILESTONE_VERSION/g" $ISSUE_TEMPLATE
# Replace all <<milestone_id>> occurrences
milestone_id=$(echo $MILESTONE_URL | cut -d '/' -f 7)
sed -i "s/<<milestone_id>>/$milestone_id/g" $ISSUE_TEMPLATE
# Replace release date if exists
if [[ ! -z $MILESTONE_DUE_ON ]]; then
date=$(echo $MILESTONE_DUE_ON | cut -d 'T' -f 1)
sed -i "s/Release date\: 20XX-XX-XX/Release date\: $date/g" $ISSUE_TEMPLATE
fi
- name: Create the issue
run: |
gh issue create \
--title "$MILESTONE_VERSION ROADMAP" \
--label 'epic,impacts docs,impacts integrations,impacts cloud' \
--body-file $ISSUE_TEMPLATE \
--milestone $MILESTONE_VERSION
create-changelog-issue:
needs: get-release-version
# Create the changelog issue if the release is not only a patch release
if: github.event.action == 'created' && needs.get-release-version.outputs.is-patch == 'false'
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/changelog-issue.md > $ISSUE_TEMPLATE
- name: Replace all empty occurrences in the templates
run: |
# Replace all <<version>> occurrences
sed -i "s/<<version>>/$MILESTONE_VERSION/g" $ISSUE_TEMPLATE
# Replace all <<milestone_id>> occurrences
milestone_id=$(echo $MILESTONE_URL | cut -d '/' -f 7)
sed -i "s/<<milestone_id>>/$milestone_id/g" $ISSUE_TEMPLATE
- name: Create the issue
run: |
gh issue create \
--title "Create release changelogs for $MILESTONE_VERSION" \
--label 'impacts docs,documentation' \
--body-file $ISSUE_TEMPLATE \
--milestone $MILESTONE_VERSION \
--assignee curquiza
create-update-version-issue:
needs: get-release-version
# Create the update-version issue even if the release is a patch release
if: github.event.action == 'created'
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/update-version-issue.md > $ISSUE_TEMPLATE
- name: Create the issue
run: |
gh issue create \
--title "Update version in Cargo.toml for $MILESTONE_VERSION" \
--label 'maintenance' \
--body-file $ISSUE_TEMPLATE \
--milestone $MILESTONE_VERSION
create-update-openapi-issue:
needs: get-release-version
# Create the openAPI issue if the release is not only a patch release
if: github.event.action == 'created' && needs.get-release-version.outputs.is-patch == 'false'
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/update-openapi-issue.md > $ISSUE_TEMPLATE
- name: Create the issue
run: |
gh issue create \
--title "Update Open API file for $MILESTONE_VERSION" \
--label 'maintenance' \
--body-file $ISSUE_TEMPLATE \
--milestone $MILESTONE_VERSION
update-ruleset:
runs-on: ubuntu-latest
if: github.event.action == 'created'
steps:
- uses: actions/checkout@v3
- name: Install jq
run: |
sudo apt-get update
sudo apt-get install -y jq
- name: Update ruleset
env:
# gh api repos/meilisearch/meilisearch/rulesets --jq '.[] | {name: .name, id: .id}'
RULESET_ID: 4253297
BRANCH_NAME: ${{ github.event.inputs.branch_name }}
run: |
echo "RULESET_ID: ${{ env.RULESET_ID }}"
echo "BRANCH_NAME: ${{ env.BRANCH_NAME }}"
# Get current ruleset conditions
CONDITIONS=$(gh api repos/meilisearch/meilisearch/rulesets/${{ env.RULESET_ID }} --jq '{ conditions: .conditions }')
# Update the conditions by appending the milestone version
UPDATED_CONDITIONS=$(echo $CONDITIONS | jq '.conditions.ref_name.include += ["refs/heads/release-'${{ env.MILESTONE_VERSION }}'"]')
# Update the ruleset from stdin (-)
echo $UPDATED_CONDITIONS |
gh api repos/meilisearch/meilisearch/rulesets/${{ env.RULESET_ID }} \
--method PUT \
-H "Accept: application/vnd.github+json" \
-H "X-GitHub-Api-Version: 2022-11-28" \
--input -
# ----------------
# MILESTONE CLOSED
# ----------------
create-release-label:
if: github.event.action == 'closed'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Create the ${{ env.MILESTONE_VERSION }} label
run: |
label_description="PRs/issues solved in $MILESTONE_VERSION"
if [[ ! -z $MILESTONE_DUE_ON ]]; then
date=$(echo $MILESTONE_DUE_ON | cut -d 'T' -f 1)
label_description="$label_description released on $date"
fi
gh api repos/meilisearch/meilisearch/labels \
--method POST \
-H "Accept: application/vnd.github+json" \
-f name="$MILESTONE_VERSION" \
-f description="$label_description" \
-f color='ff5ba3'
labelize-all-milestone-content:
if: github.event.action == 'closed'
needs: create-release-label
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Add label ${{ env.MILESTONE_VERSION }} to all PRs in the Milestone
run: |
prs=$(gh pr list --search milestone:"$MILESTONE_VERSION" --limit 1000 --state all --json number --template '{{range .}}{{tablerow (printf "%v" .number)}}{{end}}')
for pr in $prs; do
gh pr edit $pr --add-label $MILESTONE_VERSION
done
- name: Add label ${{ env.MILESTONE_VERSION }} to all issues in the Milestone
run: |
issues=$(gh issue list --search milestone:"$MILESTONE_VERSION" --limit 1000 --state all --json number --template '{{range .}}{{tablerow (printf "%v" .number)}}{{end}}')
for issue in $issues; do
gh issue edit $issue --add-label $MILESTONE_VERSION
done

View File

@@ -32,7 +32,7 @@ jobs:
- name: Build deb package
run: cargo deb -p meilisearch -o target/debian/meilisearch.deb
- name: Upload debian pkg to release
uses: svenstaro/upload-release-action@2.11.2
uses: svenstaro/upload-release-action@2.11.1
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/debian/meilisearch.deb

View File

@@ -51,7 +51,7 @@ jobs:
# No need to upload binaries for dry run (cron)
- name: Upload binaries to release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.11.2
uses: svenstaro/upload-release-action@2.11.1
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/release/meilisearch
@@ -81,7 +81,7 @@ jobs:
# No need to upload binaries for dry run (cron)
- name: Upload binaries to release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.11.2
uses: svenstaro/upload-release-action@2.11.1
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/release/${{ matrix.artifact_name }}
@@ -113,7 +113,7 @@ jobs:
- name: Upload the binary to release
# No need to upload binaries for dry run (cron)
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.11.2
uses: svenstaro/upload-release-action@2.11.1
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/${{ matrix.target }}/release/meilisearch
@@ -178,7 +178,7 @@ jobs:
- name: Upload the binary to release
# No need to upload binaries for dry run (cron)
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.11.2
uses: svenstaro/upload-release-action@2.11.1
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/${{ matrix.target }}/release/meilisearch

View File

@@ -16,8 +16,6 @@ on:
jobs:
docker:
runs-on: docker
permissions:
id-token: write # This is needed to use Cosign in keyless mode
steps:
- uses: actions/checkout@v3
@@ -64,9 +62,6 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Install cosign
uses: sigstore/cosign-installer@d58896d6a1865668819e1d91763c7751a165e159 # tag=v3.9.2
- name: Login to Docker Hub
uses: docker/login-action@v3
with:
@@ -90,7 +85,6 @@ jobs:
- name: Build and push
uses: docker/build-push-action@v6
id: build-and-push
with:
push: true
platforms: linux/amd64,linux/arm64
@@ -100,17 +94,6 @@ jobs:
COMMIT_DATE=${{ steps.build-metadata.outputs.date }}
GIT_TAG=${{ github.ref_name }}
- name: Sign the images with GitHub OIDC Token
env:
DIGEST: ${{ steps.build-and-push.outputs.digest }}
TAGS: ${{ steps.meta.outputs.tags }}
run: |
images=""
for tag in ${TAGS}; do
images+="${tag}@${DIGEST} "
done
cosign sign --yes ${images}
# /!\ Don't touch this without checking with Cloud team
- name: Send CI information to Cloud team
# Do not send if nightly build (i.e. 'schedule' or 'workflow_dispatch' event)

View File

@@ -1,20 +0,0 @@
name: Release Drafter
permissions:
contents: read
pull-requests: write
on:
push:
branches:
- main
jobs:
update_release_draft:
runs-on: ubuntu-latest
steps:
- uses: release-drafter/release-drafter@v6
with:
config-name: release-draft-template.yml
env:
GITHUB_TOKEN: ${{ secrets.RELEASE_DRAFTER_TOKEN }}

View File

@@ -9,7 +9,7 @@ on:
required: false
default: nightly
schedule:
- cron: '0 6 * * *' # Every day at 6:00am
- cron: "0 6 * * MON" # Every Monday at 6:00AM
env:
MEILI_MASTER_KEY: 'masterKey'
@@ -114,7 +114,7 @@ jobs:
dep ensure
fi
- name: Run integration tests
run: go test --race -v ./integration
run: go test -v ./...
meilisearch-java-tests:
needs: define-docker-image
@@ -344,23 +344,15 @@ jobs:
MEILI_NO_ANALYTICS: ${{ env.MEILI_NO_ANALYTICS }}
ports:
- '7700:7700'
env:
RAILS_VERSION: '7.0'
steps:
- uses: actions/checkout@v3
with:
repository: meilisearch/meilisearch-rails
- name: Install SQLite dependencies
run: sudo apt-get update && sudo apt-get install -y libsqlite3-dev
- name: Set up Ruby
- name: Set up Ruby 3
uses: ruby/setup-ruby@v1
with:
ruby-version: 3
bundler-cache: true
- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.12.0
with:
mongodb-version: 8.0
- name: Run tests
run: bundle exec rspec

View File

@@ -3,7 +3,7 @@ name: Test suite
on:
workflow_dispatch:
schedule:
# Every day at 5:00am
# Everyday at 5:00am
- cron: "0 5 * * *"
pull_request:
merge_group:

View File

@@ -106,13 +106,7 @@ Run `cargo xtask --help` from the root of the repository to find out what is ava
#### Update the openAPI file if the APIchanged
To update the openAPI file in the code, see [sprint_issue.md](https://github.com/meilisearch/meilisearch/blob/main/.github/ISSUE_TEMPLATE/sprint_issue.md#reminders-when-modifying-the-api).
If you want to update the openAPI file on the [open-api repository](https://github.com/meilisearch/open-api):
- Pull the latest version of the latest rc of Meilisearch `git checkout release-vX.Y.Z; git pull`
- Starts Meilisearch with the `swagger` feature flag: `cargo run --features swagger`
- On a browser, open the following URL: http://localhost:7700/scalar
- Click the « Download openAPI file »
- Open a PR replacing [this file](https://github.com/meilisearch/open-api/blob/main/open-api.json) with the one downloaded
If you want to update the openAPI file on the [open-api repository](https://github.com/meilisearch/open-api), see [update-openapi-issue.md](https://github.com/meilisearch/engine-team/blob/main/issue-templates/update-openapi-issue.md).
### Logging
@@ -166,37 +160,25 @@ Some notes on GitHub PRs:
The draft PRs are recommended when you want to show that you are working on something and make your work visible.
- The branch related to the PR must be **up-to-date with `main`** before merging. Fortunately, this project uses [GitHub Merge Queues](https://github.blog/news-insights/product-news/github-merge-queue-is-generally-available/) to automatically enforce this requirement without the PR author having to rebase manually.
## Merging PRs
This project uses GitHub Merge Queues that helps us manage pull requests merging.
Before merging a PR, the maintainer should ensure the following requirements are met
- Automated tests have been added.
- If some tests cannot be automated, manual rigorous tests should be applied.
- ⚠️ If there is an change in the DB: it's mandatory to manually test the `--experimental-dumpless-upgrade` on a DB of the previous Meilisearch minor version (e.g. v1.13 for the v1.14 release).
- If necessary, the feature have been tested in the Cloud production environment (with [prototypes](./documentation/prototypes.md)) and the Cloud UI is ready.
- If necessary, the [documentation](https://github.com/meilisearch/documentation) related to the implemented feature in the PR is ready.
- If necessary, the [integrations](https://github.com/meilisearch/integration-guides) related to the implemented feature in the PR are ready.
## Publish Process (for internal team only)
## Release Process (for internal team only)
Meilisearch tools follow the [Semantic Versioning Convention](https://semver.org/).
### How to publish a new release
### Automation to rebase and Merge the PRs
The full Meilisearch release process is described in [this guide](./documentation/release.md).
This project uses GitHub Merge Queues that helps us manage pull requests merging.
### How to Publish a new Release
The full Meilisearch release process is described in [this guide](https://github.com/meilisearch/engine-team/blob/main/resources/meilisearch-release.md). Please follow it carefully before doing any release.
### How to publish a prototype
Depending on the developed feature, you might need to provide a prototyped version of Meilisearch to make it easier to test by the users.
This happens in two steps:
- [Release the prototype](./documentation/prototypes.md#how-to-publish-a-prototype)
- [Communicate about it](./documentation/prototypes.md#communication)
### How to implement and publish an experimental feature
Here is our [guidelines and process](./documentation/experimental-features.md) to implement and publish an experimental feature.
- [Release the prototype](https://github.com/meilisearch/engine-team/blob/main/resources/prototypes.md#how-to-publish-a-prototype)
- [Communicate about it](https://github.com/meilisearch/engine-team/blob/main/resources/prototypes.md#communication)
### Release assets

42
Cargo.lock generated
View File

@@ -580,7 +580,7 @@ source = "git+https://github.com/meilisearch/bbqueue#cbb87cc707b5af415ef203bdaf2
[[package]]
name = "benchmarks"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"bumpalo",
@@ -770,7 +770,7 @@ dependencies = [
[[package]]
name = "build-info"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"time",
@@ -1774,7 +1774,7 @@ dependencies = [
[[package]]
name = "dump"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"big_s",
@@ -2006,7 +2006,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-store"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"tempfile",
"thiserror 2.0.12",
@@ -2028,7 +2028,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"insta",
"nom",
@@ -2049,7 +2049,7 @@ dependencies = [
[[package]]
name = "flatten-serde-json"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"criterion",
"serde_json",
@@ -2194,7 +2194,7 @@ dependencies = [
[[package]]
name = "fuzzers"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"arbitrary",
"bumpalo",
@@ -2994,7 +2994,7 @@ dependencies = [
[[package]]
name = "index-scheduler"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"backoff",
@@ -3230,7 +3230,7 @@ dependencies = [
[[package]]
name = "json-depth-checker"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"criterion",
"serde_json",
@@ -3724,7 +3724,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "meili-snap"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"insta",
"md5",
@@ -3735,7 +3735,7 @@ dependencies = [
[[package]]
name = "meilisearch"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"actix-cors",
"actix-http",
@@ -3745,6 +3745,7 @@ dependencies = [
"actix-web-lab",
"anyhow",
"async-openai",
"backoff",
"brotli",
"bstr",
"build-info",
@@ -3831,7 +3832,7 @@ dependencies = [
[[package]]
name = "meilisearch-auth"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"base64 0.22.1",
"enum-iterator",
@@ -3850,7 +3851,7 @@ dependencies = [
[[package]]
name = "meilisearch-types"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"actix-web",
"anyhow",
@@ -3885,7 +3886,7 @@ dependencies = [
[[package]]
name = "meilitool"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"clap",
@@ -3919,7 +3920,7 @@ dependencies = [
[[package]]
name = "milli"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"allocator-api2 0.3.0",
"arroy",
@@ -3988,6 +3989,7 @@ dependencies = [
"time",
"tokenizers",
"tracing",
"twox-hash",
"ureq",
"url",
"utoipa",
@@ -4471,7 +4473,7 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "permissive-json-pointer"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"big_s",
"serde_json",
@@ -6430,6 +6432,12 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "twox-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b907da542cbced5261bd3256de1b3a1bf340a3d37f93425a07362a1d687de56"
[[package]]
name = "typeid"
version = "1.0.3"
@@ -7259,7 +7267,7 @@ dependencies = [
[[package]]
name = "xtask"
version = "1.17.0"
version = "1.16.0"
dependencies = [
"anyhow",
"build-info",

View File

@@ -22,7 +22,7 @@ members = [
]
[workspace.package]
version = "1.17.0"
version = "1.16.0"
authors = [
"Quentin de Quelen <quentin@dequelen.me>",
"Clément Renault <clement@meilisearch.com>",

View File

@@ -19,3 +19,11 @@ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
---
🔒 Meilisearch Enterprise Edition (EE)
Certain parts of this codebase are not licensed under the MIT license and governed by the Business Source License 1.1.
See the LICENSE-EE file for details.

67
LICENSE-EE Normal file
View File

@@ -0,0 +1,67 @@
Business Source License 1.1 Adapted for Meili SAS
This license is based on the Business Source License version 1.1, as published by MariaDB Corporation Ab.
Parameters
Licensor: Meili SAS
Licensed Work: Any file explicitly marked as “Enterprise Edition (EE)” or “governed by the Business Source License”.
Additional Use Grant:
You may use, modify, and distribute the Licensed Work for non-production purposes only, such as testing, development, or evaluation.
Production use of the Licensed Work requires a commercial license agreement with Meilisearch. Contact bonjour@meilisearch.com for licensing.
Change License: MIT
Change Date: Four years from the date the Licensed Work is published.
This License does not apply to any code outside of the Licensed Work, which remains under the MIT license.
For information about alternative licensing arrangements for the Licensed Work,
please contact bonjour@meilisearch.com or sales@meilisearch.com.
Notice
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN "AS IS" BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.

View File

@@ -89,6 +89,26 @@ We also offer a wide range of dedicated guides to all Meilisearch features, such
Finally, for more in-depth information, refer to our articles explaining fundamental Meilisearch concepts such as [documents](https://www.meilisearch.com/docs/learn/core_concepts/documents?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=advanced) and [indexes](https://www.meilisearch.com/docs/learn/core_concepts/indexes?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=advanced).
## 🧾 Editions & Licensing
Meilisearch is available in two editions:
### 🧪 Community Edition (CE)
- Fully open source under the [MIT license](./LICENSE)
- Core search engine with fast and relevant full-text, semantic or hybrid search
- Free to use for anyone, including commercial usage
### 🏢 Enterprise Edition (EE)
- Includes advanced features such as:
- Sharding
- Governed by a [commercial license](./LICENSE-EE) or the [Business Source License 1.1](https://mariadb.com/bsl11)
- Not allowed in production without a commercial agreement with Meilisearch.
- You may use, modify, and distribute the Licensed Work for non-production purposes only, such as testing, development, or evaluation.
Want access to Enterprise features? → Contact us at [sales@meilisearch.com](maito:sales@meilisearch.com).
## 📊 Telemetry
Meilisearch collects **anonymized** user data to help us improve our product. You can [deactivate this](https://www.meilisearch.com/docs/learn/what_is_meilisearch/telemetry?utm_campaign=oss&utm_source=github&utm_medium=meilisearch&utm_content=telemetry#how-to-disable-data-collection) whenever you want.
@@ -119,6 +139,6 @@ Meilisearch is, and will always be, open-source! If you want to contribute to th
Meilisearch releases and their associated binaries are available on the project's [releases page](https://github.com/meilisearch/meilisearch/releases).
The binaries are versioned following [SemVer conventions](https://semver.org/). To know more, read our [versioning policy](./documentation/versioning-policy.md).
The binaries are versioned following [SemVer conventions](https://semver.org/). To know more, read our [versioning policy](https://github.com/meilisearch/engine-team/blob/main/resources/versioning-policy.md).
Differently from the binaries, crates in this repository are not currently available on [crates.io](https://crates.io/) and do not follow [SemVer conventions](https://semver.org).

View File

@@ -55,7 +55,3 @@ harness = false
[[bench]]
name = "sort"
harness = false
[[bench]]
name = "filter_starts_with"
harness = false

View File

@@ -1,66 +0,0 @@
mod datasets_paths;
mod utils;
use criterion::{criterion_group, criterion_main};
use milli::update::Settings;
use milli::FilterableAttributesRule;
use utils::Conf;
#[cfg(not(windows))]
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn base_conf(builder: &mut Settings) {
let displayed_fields = ["geonameid", "name"].iter().map(|s| s.to_string()).collect();
builder.set_displayed_fields(displayed_fields);
let filterable_fields =
["name"].iter().map(|s| FilterableAttributesRule::Field(s.to_string())).collect();
builder.set_filterable_fields(filterable_fields);
}
#[rustfmt::skip]
const BASE_CONF: Conf = Conf {
dataset: datasets_paths::SMOL_ALL_COUNTRIES,
dataset_format: "jsonl",
queries: &[
"",
],
configure: base_conf,
primary_key: Some("geonameid"),
..Conf::BASE
};
fn filter_starts_with(c: &mut criterion::Criterion) {
#[rustfmt::skip]
let confs = &[
utils::Conf {
group_name: "1 letter",
filter: Some("name STARTS WITH e"),
..BASE_CONF
},
utils::Conf {
group_name: "2 letters",
filter: Some("name STARTS WITH es"),
..BASE_CONF
},
utils::Conf {
group_name: "3 letters",
filter: Some("name STARTS WITH est"),
..BASE_CONF
},
utils::Conf {
group_name: "6 letters",
filter: Some("name STARTS WITH estoni"),
..BASE_CONF
}
];
utils::run_benches(c, confs);
}
criterion_group!(benches, filter_starts_with);
criterion_main!(benches);

View File

@@ -154,6 +154,7 @@ fn indexing_songs_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -221,6 +222,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -266,6 +268,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -335,6 +338,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -412,6 +416,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -457,6 +462,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -498,6 +504,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -566,6 +573,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -633,6 +641,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -700,6 +709,7 @@ fn indexing_wiki(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -766,6 +776,7 @@ fn reindexing_wiki(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -811,6 +822,7 @@ fn reindexing_wiki(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -879,6 +891,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -956,6 +969,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1002,6 +1016,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1044,6 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1111,6 +1127,7 @@ fn indexing_movies_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1177,6 +1194,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1222,6 +1240,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1290,6 +1309,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1404,6 +1424,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1449,6 +1470,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1490,6 +1512,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1580,6 +1603,7 @@ fn indexing_nested_movies_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1671,6 +1695,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1754,6 +1779,7 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1821,6 +1847,7 @@ fn indexing_geo(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1887,6 +1914,7 @@ fn reindexing_geo(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -1932,6 +1960,7 @@ fn reindexing_geo(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();
@@ -2000,6 +2029,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();

View File

@@ -123,6 +123,7 @@ pub fn base_setup(conf: &Conf) -> Index {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();

View File

@@ -10,7 +10,7 @@ use meilisearch_types::keys::Key;
use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::Unchecked;
use meilisearch_types::tasks::{
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId,
Details, ExportIndexSettings, IndexSwap, KindWithContent, Status, Task, TaskId, TaskNetwork,
};
use meilisearch_types::InstanceUid;
use roaring::RoaringBitmap;
@@ -94,6 +94,8 @@ pub struct TaskDump {
default
)]
pub finished_at: Option<OffsetDateTime>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
}
// A `Kind` specific version made for the dump. If modified you may break the dump.
@@ -171,6 +173,7 @@ impl From<Task> for TaskDump {
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
network: task.network,
}
}
}
@@ -250,11 +253,12 @@ pub(crate) mod test {
use maplit::{btreemap, btreeset};
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::facet_values_sort::FacetValuesSort;
use meilisearch_types::features::{Network, Remote, RuntimeTogglableFeatures};
use meilisearch_types::features::RuntimeTogglableFeatures;
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::{Action, Key};
use meilisearch_types::milli::update::Setting;
use meilisearch_types::milli::{self, FilterableAttributesRule};
use meilisearch_types::network::{Network, Remote};
use meilisearch_types::settings::{Checked, FacetingSettings, Settings};
use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{BatchStopReason, Details, Kind, Status};
@@ -383,6 +387,7 @@ pub(crate) mod test {
enqueued_at: datetime!(2022-11-11 0:00 UTC),
started_at: Some(datetime!(2022-11-20 0:00 UTC)),
finished_at: Some(datetime!(2022-11-21 0:00 UTC)),
network: None,
},
None,
),
@@ -407,6 +412,7 @@ pub(crate) mod test {
enqueued_at: datetime!(2022-11-11 0:00 UTC),
started_at: None,
finished_at: None,
network: None,
},
Some(vec![
json!({ "id": 4, "race": "leonberg" }).as_object().unwrap().clone(),
@@ -426,6 +432,7 @@ pub(crate) mod test {
enqueued_at: datetime!(2022-11-15 0:00 UTC),
started_at: None,
finished_at: None,
network: None,
},
None,
),
@@ -538,7 +545,8 @@ pub(crate) mod test {
fn create_test_network() -> Network {
Network {
local: Some("myself".to_string()),
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()) }},
remotes: maplit::btreemap! {"other".to_string() => Remote { url: "http://test".to_string(), search_api_key: Some("apiKey".to_string()), write_api_key: Some("docApiKey".to_string()) }},
sharding: false,
}
}

View File

@@ -161,6 +161,7 @@ impl CompatV5ToV6 {
enqueued_at: task_view.enqueued_at,
started_at: task_view.started_at,
finished_at: task_view.finished_at,
network: None,
};
(task, content_file)
@@ -202,10 +203,6 @@ impl CompatV5ToV6 {
pub fn network(&self) -> Result<Option<&v6::Network>> {
Ok(None)
}
pub fn webhooks(&self) -> Option<&v6::Webhooks> {
None
}
}
pub enum CompatIndexV5ToV6 {

View File

@@ -138,13 +138,6 @@ impl DumpReader {
DumpReader::Compat(compat) => compat.network(),
}
}
pub fn webhooks(&self) -> Option<&v6::Webhooks> {
match self {
DumpReader::Current(current) => current.webhooks(),
DumpReader::Compat(compat) => compat.webhooks(),
}
}
}
impl From<V6Reader> for DumpReader {
@@ -372,7 +365,6 @@ pub(crate) mod test {
assert_eq!(dump.features().unwrap().unwrap(), RuntimeTogglableFeatures::default());
assert_eq!(dump.network().unwrap(), None);
assert_eq!(dump.webhooks(), None);
}
#[test]
@@ -443,43 +435,6 @@ pub(crate) mod test {
insta::assert_snapshot!(network.remotes.get("ms-2").as_ref().unwrap().search_api_key.as_ref().unwrap(), @"foo");
}
#[test]
fn import_dump_v6_webhooks() {
let dump = File::open("tests/assets/v6-with-webhooks.dump").unwrap();
let dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_snapshot!(dump.date().unwrap(), @"2025-07-31 9:21:30.479544 +00:00:00");
insta::assert_debug_snapshot!(dump.instance_uid().unwrap(), @r"
Some(
cb887dcc-34b3-48d1-addd-9815ae721a81,
)
");
// webhooks
let webhooks = dump.webhooks().unwrap();
insta::assert_json_snapshot!(webhooks, @r#"
{
"webhooks": {
"627ea538-733d-4545-8d2d-03526eb381ce": {
"url": "https://example.com/authorization-less",
"headers": {}
},
"771b0a28-ef28-4082-b984-536f82958c65": {
"url": "https://example.com/hook",
"headers": {
"authorization": "TOKEN"
}
},
"f3583083-f8a7-4cbf-a5e7-fb3f1e28a7e9": {
"url": "https://third.com",
"headers": {}
}
}
}
"#);
}
#[test]
fn import_dump_v5() {
let dump = File::open("tests/assets/v5.dump").unwrap();

View File

@@ -24,8 +24,7 @@ pub type Batch = meilisearch_types::batches::Batch;
pub type Key = meilisearch_types::keys::Key;
pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings;
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
pub type Network = meilisearch_types::features::Network;
pub type Webhooks = meilisearch_types::webhooks::WebhooksDumpView;
pub type Network = meilisearch_types::network::Network;
// ===== Other types to clarify the code of the compat module
// everything related to the tasks
@@ -60,7 +59,6 @@ pub struct V6Reader {
keys: BufReader<File>,
features: Option<RuntimeTogglableFeatures>,
network: Option<Network>,
webhooks: Option<Webhooks>,
}
impl V6Reader {
@@ -95,8 +93,8 @@ impl V6Reader {
Err(e) => return Err(e.into()),
};
let network = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(serde_json::from_reader(&*network_file)?),
let network_file = match fs::read(dump.path().join("network.json")) {
Ok(network_file) => Some(network_file),
Err(error) => match error.kind() {
// Allows the file to be missing, this will only result in all experimental features disabled.
ErrorKind::NotFound => {
@@ -106,16 +104,10 @@ impl V6Reader {
_ => return Err(error.into()),
},
};
let webhooks = match fs::read(dump.path().join("webhooks.json")) {
Ok(webhooks_file) => Some(serde_json::from_reader(&*webhooks_file)?),
Err(error) => match error.kind() {
ErrorKind::NotFound => {
debug!("`webhooks.json` not found in dump");
None
}
_ => return Err(error.into()),
},
let network = if let Some(network_file) = network_file {
Some(serde_json::from_reader(&*network_file)?)
} else {
None
};
Ok(V6Reader {
@@ -127,7 +119,6 @@ impl V6Reader {
features,
network,
dump,
webhooks,
})
}
@@ -238,10 +229,6 @@ impl V6Reader {
pub fn network(&self) -> Option<&Network> {
self.network.as_ref()
}
pub fn webhooks(&self) -> Option<&Webhooks> {
self.webhooks.as_ref()
}
}
pub struct UpdateFile {

View File

@@ -5,10 +5,10 @@ use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::batches::Batch;
use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures};
use meilisearch_types::features::{ChatCompletionSettings, RuntimeTogglableFeatures};
use meilisearch_types::keys::Key;
use meilisearch_types::network::Network;
use meilisearch_types::settings::{Checked, Settings};
use meilisearch_types::webhooks::WebhooksDumpView;
use serde_json::{Map, Value};
use tempfile::TempDir;
use time::OffsetDateTime;
@@ -75,13 +75,6 @@ impl DumpWriter {
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
}
pub fn create_webhooks(&self, webhooks: WebhooksDumpView) -> Result<()> {
Ok(std::fs::write(
self.dir.path().join("webhooks.json"),
serde_json::to_string(&webhooks)?,
)?)
}
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder);

View File

@@ -148,11 +148,10 @@ impl File {
Ok(Self { path: PathBuf::new(), file: None })
}
pub fn persist(self) -> Result<()> {
if let Some(file) = self.file {
file.persist(&self.path)?;
}
Ok(())
pub fn persist(self) -> Result<Option<StdFile>> {
let Some(file) = self.file else { return Ok(None) };
Ok(Some(file.persist(&self.path)?))
}
}

View File

@@ -165,9 +165,9 @@ impl<'a> FilterCondition<'a> {
| Condition::Exists
| Condition::LowerThan(_)
| Condition::LowerThanOrEqual(_)
| Condition::Between { .. }
| Condition::StartsWith { .. } => None,
Condition::Contains { keyword, word: _ } => Some(keyword),
| Condition::Between { .. } => None,
Condition::Contains { keyword, word: _ }
| Condition::StartsWith { keyword, word: _ } => Some(keyword),
},
FilterCondition::Not(this) => this.use_contains_operator(),
FilterCondition::Or(seq) | FilterCondition::And(seq) => {

View File

@@ -129,6 +129,7 @@ fn main() {
&mut new_fields_ids_map,
&|| false,
Progress::default(),
None,
)
.unwrap();

View File

@@ -147,6 +147,7 @@ impl<'a> Dump<'a> {
canceled_by: task.canceled_by,
details: task.details,
status: task.status,
network: task.network,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,

View File

@@ -1,8 +1,9 @@
use std::sync::{Arc, RwLock};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
use meilisearch_types::network::Network;
use crate::error::FeatureNotEnabledError;
use crate::Result;
@@ -85,7 +86,7 @@ impl RoFeatures {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "Using `CONTAINS` in a filter",
disabled_action: "Using `CONTAINS` or `STARTS WITH` in a filter",
feature: "contains filter",
issue_link: "https://github.com/orgs/meilisearch/discussions/763",
}
@@ -182,7 +183,6 @@ impl FeatureData {
..persisted_features
}));
// Once this is stabilized, network should be stored along with webhooks in index-scheduler's persisted database
let network_db = runtime_features_db.remap_data_type::<SerdeJson<Network>>();
let network: Network = network_db.get(wtxn, db_keys::NETWORK)?.unwrap_or_default();

View File

@@ -71,7 +71,7 @@ pub struct IndexMapper {
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
/// The map size an index is opened with on the first time.
pub(crate) index_base_map_size: usize,
index_base_map_size: usize,
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
index_growth_amount: usize,
/// Whether we open a meilisearch index with the MDB_WRITEMAP option or not.

View File

@@ -26,11 +26,11 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
version,
queue,
scheduler,
persisted,
index_mapper,
features: _,
webhooks: _,
webhook_url: _,
webhook_authorization_header: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
@@ -62,13 +62,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
}
snap.push_str("\n----------------------------------------------------------------------\n");
let persisted_db_snapshot = snapshot_persisted_db(&rtxn, persisted);
if !persisted_db_snapshot.is_empty() {
snap.push_str("### Persisted:\n");
snap.push_str(&persisted_db_snapshot);
snap.push_str("----------------------------------------------------------------------\n");
}
snap.push_str("### All Tasks:\n");
snap.push_str(&snapshot_all_tasks(&rtxn, queue.tasks.all_tasks));
snap.push_str("----------------------------------------------------------------------\n");
@@ -207,16 +200,6 @@ pub fn snapshot_date_db(rtxn: &RoTxn, db: Database<BEI128, CboRoaringBitmapCodec
snap
}
pub fn snapshot_persisted_db(rtxn: &RoTxn, db: &Database<Str, Str>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
let (key, value) = next.unwrap();
snap.push_str(&format!("{key}: {value}\n"));
}
snap
}
pub fn snapshot_task(task: &Task) -> String {
let mut snap = String::new();
let Task {
@@ -230,6 +213,7 @@ pub fn snapshot_task(task: &Task) -> String {
details,
status,
kind,
network,
} = task;
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
@@ -247,6 +231,9 @@ pub fn snapshot_task(task: &Task) -> String {
snap.push_str(&format!("details: {}, ", &snapshot_details(details)));
}
snap.push_str(&format!("kind: {kind:?}"));
if let Some(network) = network {
snap.push_str(&format!("network: {network:?}, "))
}
snap.push('}');
snap
@@ -328,7 +315,6 @@ pub fn snapshot_status(
}
snap
}
pub fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
@@ -349,7 +335,6 @@ pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>)
}
snap
}
pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database<BEU32, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();

View File

@@ -52,7 +52,7 @@ use flate2::bufread::GzEncoder;
use flate2::Compression;
use meilisearch_types::batches::Batch;
use meilisearch_types::features::{
ChatCompletionSettings, InstanceTogglableFeatures, Network, RuntimeTogglableFeatures,
ChatCompletionSettings, InstanceTogglableFeatures, RuntimeTogglableFeatures,
};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::{DecodeIgnore, SerdeJson, Str, I128};
@@ -63,18 +63,16 @@ use meilisearch_types::milli::vector::{
Embedder, EmbedderOptions, RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment,
};
use meilisearch_types::milli::{self, Index};
use meilisearch_types::network::Network;
use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{KindWithContent, Task};
use meilisearch_types::webhooks::{Webhook, WebhooksDumpView, WebhooksView};
use meilisearch_types::tasks::{KindWithContent, Task, TaskNetwork};
use milli::vector::db::IndexEmbeddingConfig;
use processing::ProcessingTasks;
pub use queue::Query;
use queue::Queue;
use roaring::RoaringBitmap;
use scheduler::Scheduler;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
use versioning::Versioning;
use crate::index_mapper::IndexMapper;
@@ -83,15 +81,7 @@ use crate::utils::clamp_to_page_size;
pub(crate) type BEI128 = I128<BE>;
const TASK_SCHEDULER_SIZE_THRESHOLD_PERCENT_INT: u64 = 40;
mod db_name {
pub const CHAT_SETTINGS: &str = "chat-settings";
pub const PERSISTED: &str = "persisted";
}
mod db_keys {
pub const WEBHOOKS: &str = "webhooks";
}
const CHAT_SETTINGS_DB_NAME: &str = "chat-settings";
#[derive(Debug)]
pub struct IndexSchedulerOptions {
@@ -109,10 +99,10 @@ pub struct IndexSchedulerOptions {
pub snapshots_path: PathBuf,
/// The path to the folder containing the dumps.
pub dumps_path: PathBuf,
/// The webhook url that was set by the CLI.
pub cli_webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL that was set by the CLI.
pub cli_webhook_authorization: Option<String>,
/// The URL on which we must send the tasks statuses
pub webhook_url: Option<String>,
/// The value we will send into the Authorization HTTP header on the webhook URL
pub webhook_authorization_header: Option<String>,
/// The maximum size, in bytes, of the task index.
pub task_db_size: usize,
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
@@ -182,11 +172,10 @@ pub struct IndexScheduler {
/// Whether we should use the old document indexer or the new one.
pub(crate) experimental_no_edition_2024_for_dumps: bool,
/// A database to store single-keyed data that is persisted across restarts.
persisted: Database<Str, Str>,
/// Webhook, loaded and stored in the `persisted` database
webhooks: Arc<Webhooks>,
/// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>,
/// The Authorization header to send to the webhook URL.
pub(crate) webhook_authorization_header: Option<String>,
/// A map to retrieve the runtime representation of an embedder depending on its configuration.
///
@@ -226,9 +215,8 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
cleanup_enabled: self.cleanup_enabled,
experimental_no_edition_2024_for_dumps: self.experimental_no_edition_2024_for_dumps,
persisted: self.persisted,
webhooks: self.webhooks.clone(),
webhook_url: self.webhook_url.clone(),
webhook_authorization_header: self.webhook_authorization_header.clone(),
embedders: self.embedders.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@@ -247,7 +235,6 @@ impl IndexScheduler {
+ IndexMapper::nb_db()
+ features::FeatureData::nb_db()
+ 1 // chat-prompts
+ 1 // persisted
}
/// Create an index scheduler and start its run loop.
@@ -298,18 +285,10 @@ impl IndexScheduler {
let version = versioning::Versioning::new(&env, from_db_version)?;
let mut wtxn = env.write_txn()?;
let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?;
let queue = Queue::new(&env, &mut wtxn, &options)?;
let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?;
let chat_settings = env.create_database(&mut wtxn, Some(db_name::CHAT_SETTINGS))?;
let persisted = env.create_database(&mut wtxn, Some(db_name::PERSISTED))?;
let webhooks_db = persisted.remap_data_type::<SerdeJson<Webhooks>>();
let mut webhooks = webhooks_db.get(&wtxn, db_keys::WEBHOOKS)?.unwrap_or_default();
webhooks
.with_cli(options.cli_webhook_url.clone(), options.cli_webhook_authorization.clone());
let chat_settings = env.create_database(&mut wtxn, Some(CHAT_SETTINGS_DB_NAME))?;
wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build.
@@ -325,8 +304,8 @@ impl IndexScheduler {
experimental_no_edition_2024_for_dumps: options
.indexer_config
.experimental_no_edition_2024_for_dumps,
persisted,
webhooks: Arc::new(webhooks),
webhook_url: options.webhook_url,
webhook_authorization_header: options.webhook_authorization_header,
embedders: Default::default(),
#[cfg(test)]
@@ -666,6 +645,16 @@ impl IndexScheduler {
self.queue.get_task_ids_from_authorized_indexes(&rtxn, query, filters, &processing)
}
pub fn set_task_network(&self, task_id: TaskId, network: TaskNetwork) -> Result<()> {
let mut wtxn = self.env.write_txn()?;
let mut task =
self.queue.tasks.get_task(&wtxn, task_id)?.ok_or(Error::TaskNotFound(task_id))?;
task.network = Some(network);
self.queue.tasks.all_tasks.put(&mut wtxn, &task_id, &task)?;
wtxn.commit()?;
Ok(())
}
/// Return the batches matching the query from the user's point of view along
/// with the total number of batches matching the query, ignoring from and limit.
///
@@ -774,92 +763,86 @@ impl IndexScheduler {
Ok(())
}
/// Once the tasks changes have been committed we must send all the tasks that were updated to our webhooks
fn notify_webhooks(&self, updated: RoaringBitmap) {
struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler,
tasks: &'b mut roaring::bitmap::Iter<'b>,
buffer: Vec<u8>,
written: usize,
}
/// Once the tasks changes have been committed we must send all the tasks that were updated to our webhook if there is one.
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
if let Some(ref url) = self.webhook_url {
struct TaskReader<'a, 'b> {
rtxn: &'a RoTxn<'a>,
index_scheduler: &'a IndexScheduler,
tasks: &'b mut roaring::bitmap::Iter<'b>,
buffer: Vec<u8>,
written: usize,
}
impl Read for TaskReader<'_, '_> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
match self.tasks.next() {
None => return Ok(0),
Some(task_id) => {
let task = self
.index_scheduler
.queue
.tasks
.get_task(self.rtxn, task_id)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.ok_or_else(|| {
io::Error::new(io::ErrorKind::Other, Error::CorruptedTaskQueue)
})?;
impl Read for TaskReader<'_, '_> {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
if self.buffer.is_empty() {
match self.tasks.next() {
None => return Ok(0),
Some(task_id) => {
let task = self
.index_scheduler
.queue
.tasks
.get_task(self.rtxn, task_id)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
.ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Error::CorruptedTaskQueue,
)
})?;
serde_json::to_writer(&mut self.buffer, &TaskView::from_task(&task))?;
self.buffer.push(b'\n');
serde_json::to_writer(
&mut self.buffer,
&TaskView::from_task(&task),
)?;
self.buffer.push(b'\n');
}
}
}
let mut to_write = &self.buffer[self.written..];
let wrote = io::copy(&mut to_write, &mut buf)?;
self.written += wrote as usize;
// we wrote everything and must refresh our buffer on the next call
if self.written == self.buffer.len() {
self.written = 0;
self.buffer.clear();
}
Ok(wrote as usize)
}
}
let mut to_write = &self.buffer[self.written..];
let wrote = io::copy(&mut to_write, &mut buf)?;
self.written += wrote as usize;
let rtxn = self.env.read_txn()?;
// we wrote everything and must refresh our buffer on the next call
if self.written == self.buffer.len() {
self.written = 0;
self.buffer.clear();
}
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: self,
tasks: &mut updated.into_iter(),
buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes
written: 0,
};
Ok(wrote as usize)
// let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
let request = match &self.webhook_authorization_header {
Some(header) => request.set("Authorization", header),
None => request,
};
if let Err(e) = request.send(reader) {
tracing::error!("While sending data to the webhook: {e}");
}
}
let webhooks = self.webhooks.get_all();
if webhooks.is_empty() {
return;
}
let this = self.private_clone();
// We must take the RoTxn before entering the thread::spawn otherwise another batch may be
// processed before we had the time to take our txn.
let rtxn = match self.env.clone().static_read_txn() {
Ok(rtxn) => rtxn,
Err(e) => {
tracing::error!("Couldn't get an rtxn to notify the webhook: {e}");
return;
}
};
std::thread::spawn(move || {
for (uuid, Webhook { url, headers }) in webhooks.iter() {
let task_reader = TaskReader {
rtxn: &rtxn,
index_scheduler: &this,
tasks: &mut updated.iter(),
buffer: Vec::with_capacity(page_size::get()),
written: 0,
};
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
let mut request = ureq::post(url)
.timeout(Duration::from_secs(30))
.set("Content-Encoding", "gzip")
.set("Content-Type", "application/x-ndjson");
for (header_name, header_value) in headers.iter() {
request = request.set(header_name, header_value);
}
if let Err(e) = request.send(reader) {
tracing::error!("While sending data to the webhook {uuid}: {e}");
}
}
});
Ok(())
}
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
@@ -890,29 +873,6 @@ impl IndexScheduler {
self.features.network()
}
pub fn update_runtime_webhooks(&self, runtime: RuntimeWebhooks) -> Result<()> {
let webhooks = Webhooks::from_runtime(runtime);
let mut wtxn = self.env.write_txn()?;
let webhooks_db = self.persisted.remap_data_type::<SerdeJson<Webhooks>>();
webhooks_db.put(&mut wtxn, db_keys::WEBHOOKS, &webhooks)?;
wtxn.commit()?;
self.webhooks.update_runtime(webhooks.into_runtime());
Ok(())
}
pub fn webhooks_dump_view(&self) -> WebhooksDumpView {
// We must not dump the cli api key
WebhooksDumpView { webhooks: self.webhooks.get_runtime() }
}
pub fn webhooks_view(&self) -> WebhooksView {
WebhooksView { webhooks: self.webhooks.get_all() }
}
pub fn retrieve_runtime_webhooks(&self) -> RuntimeWebhooks {
self.webhooks.get_runtime()
}
pub fn embedders(
&self,
index_uid: String,
@@ -1041,72 +1001,3 @@ pub struct IndexStats {
/// Internal stats computed from the index.
pub inner_stats: index_mapper::IndexStats,
}
/// These structure are not meant to be exposed to the end user, if needed, use the meilisearch-types::webhooks structure instead.
/// /!\ Everytime you deserialize this structure you should fill the cli_webhook later on with the `with_cli` method. /!\
#[derive(Debug, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
struct Webhooks {
// The cli webhook should *never* be stored in a database.
// It represent a state that only exists for this execution of meilisearch
#[serde(skip)]
pub cli: Option<CliWebhook>,
#[serde(default)]
pub runtime: RwLock<RuntimeWebhooks>,
}
type RuntimeWebhooks = BTreeMap<Uuid, Webhook>;
impl Webhooks {
pub fn with_cli(&mut self, url: Option<String>, auth: Option<String>) {
if let Some(url) = url {
let webhook = CliWebhook { url, auth };
self.cli = Some(webhook);
}
}
pub fn from_runtime(webhooks: RuntimeWebhooks) -> Self {
Self { cli: None, runtime: RwLock::new(webhooks) }
}
pub fn into_runtime(self) -> RuntimeWebhooks {
// safe because we own self and it cannot be cloned
self.runtime.into_inner().unwrap()
}
pub fn update_runtime(&self, webhooks: RuntimeWebhooks) {
*self.runtime.write().unwrap() = webhooks;
}
/// Returns all the webhooks in an unified view. The cli webhook is represented with an uuid set to 0
pub fn get_all(&self) -> BTreeMap<Uuid, Webhook> {
self.cli
.as_ref()
.map(|wh| (Uuid::nil(), Webhook::from(wh)))
.into_iter()
.chain(self.runtime.read().unwrap().iter().map(|(uuid, wh)| (*uuid, wh.clone())))
.collect()
}
/// Returns all the runtime webhooks.
pub fn get_runtime(&self) -> BTreeMap<Uuid, Webhook> {
self.runtime.read().unwrap().iter().map(|(uuid, wh)| (*uuid, wh.clone())).collect()
}
}
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq)]
struct CliWebhook {
pub url: String,
pub auth: Option<String>,
}
impl From<&CliWebhook> for Webhook {
fn from(webhook: &CliWebhook) -> Self {
let mut headers = BTreeMap::new();
if let Some(ref auth) = webhook.auth {
headers.insert("Authorization".to_string(), auth.to_string());
}
Self { url: webhook.url.to_string(), headers }
}
}

View File

@@ -108,7 +108,6 @@ make_enum_progress! {
DumpTheBatches,
DumpTheIndexes,
DumpTheExperimentalFeatures,
DumpTheWebhooks,
CompressTheDump,
}
}

View File

@@ -279,6 +279,7 @@ impl Queue {
details: kind.default_details(),
status: Status::Enqueued,
kind: kind.clone(),
network: None,
};
// For deletion and cancelation tasks, we want to make extra sure that they
// don't attempt to delete/cancel tasks that are newer than themselves.

View File

@@ -97,7 +97,22 @@ impl TaskQueue {
Ok(self.all_tasks.get(rtxn, &task_id)?)
}
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
/// Update the inverted task indexes and write the new value of the task.
///
/// The passed `task` object typically comes from a previous transaction, so two kinds of modification might have occurred:
/// 1. Modification to the `task` object after loading it from the DB (the purpose of this method is to persist these changes)
/// 2. Modification to the task committed by another transaction in the DB (an annoying consequence of having lost the original
/// transaction from which the `task` instance was deserialized)
///
/// When calling this function, this `task` is modified to take into account any existing `network`
/// that can have been added since the task was loaded into memory.
///
/// Any other modification to the task that was committed from the DB since the parameter was pulled from the DB will be overwritten.
///
/// # Errors
///
/// - CorruptedTaskQueue: The task doesn't exist in the database
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &mut Task) -> Result<()> {
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
let reprocessing = old_task.status != Status::Enqueued;
@@ -157,6 +172,12 @@ impl TaskQueue {
}
}
task.network = match (old_task.network, task.network.take()) {
(None, None) => None,
(None, Some(network)) | (Some(network), None) => Some(network),
(Some(_), Some(network)) => Some(network),
};
self.all_tasks.put(wtxn, &task.uid, task)?;
Ok(())
}

View File

@@ -268,7 +268,7 @@ impl IndexScheduler {
self.queue
.tasks
.update_task(&mut wtxn, &task)
.update_task(&mut wtxn, &mut task)
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
}
if let Some(canceled_by) = canceled_by {
@@ -349,7 +349,7 @@ impl IndexScheduler {
self.queue
.tasks
.update_task(&mut wtxn, &task)
.update_task(&mut wtxn, &mut task)
.map_err(|e| Error::UnrecoverableError(Box::new(e)))?;
}
}
@@ -446,7 +446,8 @@ impl IndexScheduler {
Ok(())
})?;
self.notify_webhooks(ids);
// We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&ids);
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::AfterProcessing);

View File

@@ -270,11 +270,6 @@ impl IndexScheduler {
let network = self.network();
dump.create_network(network)?;
// 7. Dump the webhooks
progress.update_progress(DumpCreationProgress::DumpTheWebhooks);
let webhooks = self.webhooks_dump_view();
dump.create_webhooks(webhooks)?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
)).unwrap();

View File

@@ -66,6 +66,11 @@ impl IndexScheduler {
}
IndexOperation::DocumentOperation { index_uid, primary_key, operations, mut tasks } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
let network = self.network();
let shards = network.shards();
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread.
@@ -130,6 +135,7 @@ impl IndexScheduler {
&mut new_fields_ids_map,
&|| must_stop_processing.get(),
progress.clone(),
shards.as_ref(),
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?;

View File

@@ -7,73 +7,9 @@ use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::tasks::{Status, Task};
use meilisearch_types::{compression, VERSION_FILE_NAME};
use crate::heed::EnvOpenOptions;
use crate::processing::{AtomicUpdateFileStep, SnapshotCreationProgress};
use crate::queue::TaskQueue;
use crate::{Error, IndexScheduler, Result};
/// # Safety
///
/// See [`EnvOpenOptions::open`].
unsafe fn remove_tasks(
tasks: &[Task],
dst: &std::path::Path,
index_base_map_size: usize,
) -> Result<()> {
let env_options = EnvOpenOptions::new();
let mut env_options = env_options.read_txn_without_tls();
let env = env_options.max_dbs(TaskQueue::nb_db()).map_size(index_base_map_size).open(dst)?;
let mut wtxn = env.write_txn()?;
let task_queue = TaskQueue::new(&env, &mut wtxn)?;
// Destructuring to ensure the code below gets updated if a database gets added in the future.
let TaskQueue {
all_tasks,
status,
kind,
index_tasks: _, // snapshot creation tasks are not index tasks
canceled_by,
enqueued_at,
started_at,
finished_at,
} = task_queue;
for task in tasks {
all_tasks.delete(&mut wtxn, &task.uid)?;
let mut tasks = status.get(&wtxn, &task.status)?.unwrap_or_default();
tasks.remove(task.uid);
status.put(&mut wtxn, &task.status, &tasks)?;
let mut tasks = kind.get(&wtxn, &task.kind.as_kind())?.unwrap_or_default();
tasks.remove(task.uid);
kind.put(&mut wtxn, &task.kind.as_kind(), &tasks)?;
canceled_by.delete(&mut wtxn, &task.uid)?;
let timestamp = task.enqueued_at.unix_timestamp_nanos();
let mut tasks = enqueued_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
enqueued_at.put(&mut wtxn, &timestamp, &tasks)?;
if let Some(task_started_at) = task.started_at {
let timestamp = task_started_at.unix_timestamp_nanos();
let mut tasks = started_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
started_at.put(&mut wtxn, &timestamp, &tasks)?;
}
if let Some(task_finished_at) = task.finished_at {
let timestamp = task_finished_at.unix_timestamp_nanos();
let mut tasks = finished_at.get(&wtxn, &timestamp)?.unwrap_or_default();
tasks.remove(task.uid);
finished_at.put(&mut wtxn, &timestamp, &tasks)?;
}
}
wtxn.commit()?;
Ok(())
}
impl IndexScheduler {
pub(super) fn process_snapshot(
&self,
@@ -112,26 +48,14 @@ impl IndexScheduler {
};
self.env.copy_to_path(dst.join("data.mdb"), compaction_option)?;
// 2.2 Remove the current snapshot tasks
//
// This is done to ensure that the tasks are not processed again when the snapshot is imported
//
// # Safety
//
// This is safe because we open the env file we just created in a temporary directory.
// We are sure it's not being used by any other process nor thread.
unsafe {
remove_tasks(&tasks, &dst, self.index_mapper.index_base_map_size)?;
}
// 2.3 Create a read transaction on the index-scheduler
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 2.4 Create the update files directory
// 2.3 Create the update files directory
let update_files_dir = temp_snapshot_dir.path().join("update_files");
fs::create_dir_all(&update_files_dir)?;
// 2.5 Only copy the update files of the enqueued tasks
// 2.4 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.queue.tasks.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) = AtomicUpdateFileStep::new(enqueued.len() as u32);

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, batch_uid: 1, status: succeeded, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, batch_uid: 2, status: succeeded, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, batch_uid: 3, status: failed, error: ResponseError { code: 200, message: "Index `doggo` already exists.", error_code: "index_already_exists", error_type: "invalid_request", error_link: "https://docs.meilisearch.com/errors#index_already_exists" }, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -57,7 +57,7 @@ girafo: { number_of_documents: 0, field_distribution: {} }
[timestamp] [4,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.16.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
1 {uid: 1, details: {"primaryKey":"mouse"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"catto":1}}, stop reason: "created batch containing only task with id 1 of type `indexCreation` that cannot be batched with any other task.", }
2 {uid: 2, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 2 of type `indexCreation` that cannot be batched with any other task.", }
3 {uid: 3, details: {"primaryKey":"bone"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"indexCreation":1},"indexUids":{"doggo":1}}, stop reason: "created batch containing only task with id 3 of type `indexCreation` that cannot be batched with any other task.", }

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
----------------------------------------------------------------------
### Status:
enqueued [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, status: enqueued, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
----------------------------------------------------------------------
### Status:
@@ -37,7 +37,7 @@ catto [1,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.16.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: failed, error: ResponseError { code: 200, message: "Planned failure for tests.", error_code: "internal", error_type: "internal", error_link: "https://docs.meilisearch.com/errors#internal" }, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
----------------------------------------------------------------------
@@ -40,7 +40,7 @@ doggo [2,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.16.0"}, stats: {"totalNbTasks":1,"status":{"failed":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -6,7 +6,7 @@ source: crates/index-scheduler/src/scheduler/test_failure.rs
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 17, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
0 {uid: 0, batch_uid: 0, status: succeeded, details: { from: (1, 12, 0), to: (1, 16, 0) }, kind: UpgradeDatabase { from: (1, 12, 0) }}
1 {uid: 1, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
2 {uid: 2, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
3 {uid: 3, status: enqueued, details: { primary_key: Some("bone") }, kind: IndexCreation { index_uid: "doggo", primary_key: Some("bone") }}
@@ -43,7 +43,7 @@ doggo [2,3,]
[timestamp] [0,]
----------------------------------------------------------------------
### All Batches:
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.17.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
0 {uid: 0, details: {"upgradeFrom":"v1.12.0","upgradeTo":"v1.16.0"}, stats: {"totalNbTasks":1,"status":{"succeeded":1},"types":{"upgradeDatabase":1},"indexUids":{}}, stop reason: "stopped after the last task of type `upgradeDatabase` because they cannot be batched with tasks of any other type.", }
----------------------------------------------------------------------
### Batch to tasks mapping:
0 [0,]

View File

@@ -98,8 +98,8 @@ impl IndexScheduler {
indexes_path: tempdir.path().join("indexes"),
snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"),
cli_webhook_url: None,
cli_webhook_authorization: None,
webhook_url: None,
webhook_authorization_header: None,
task_db_size: 1000 * 1000 * 10, // 10 MB, we don't use MiB on purpose.
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
enable_mdb_writemap: false,

View File

@@ -39,7 +39,6 @@ pub fn upgrade_index_scheduler(
(1, 13, _) => 0,
(1, 14, _) => 0,
(1, 15, _) => 0,
(1, 16, _) => 0,
(major, minor, patch) => {
if major > current_major
|| (major == current_major && minor > current_minor)
@@ -89,6 +88,7 @@ pub fn upgrade_index_scheduler(
details: Some(Details::UpgradeDatabase { from, to }),
status: Status::Enqueued,
kind: KindWithContent::UpgradeDatabase { from },
network: None,
},
)?;
wtxn.commit()?;

View File

@@ -1,6 +1,5 @@
//! Utility functions on the DBs. Mainly getter and setters.
use crate::milli::progress::EmbedderStats;
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use std::sync::Arc;
@@ -15,6 +14,7 @@ use meilisearch_types::tasks::{
use roaring::RoaringBitmap;
use time::OffsetDateTime;
use crate::milli::progress::EmbedderStats;
use crate::{Error, Result, Task, TaskId, BEI128};
/// This structure contains all the information required to write a batch in the database without reading the tasks.
@@ -372,6 +372,7 @@ impl crate::IndexScheduler {
details,
status,
kind,
network: _,
} = task;
assert_eq!(uid, task.uid);
if task.status != Status::Enqueued {

View File

@@ -137,14 +137,6 @@ impl HeedAuthStore {
Action::ChatsSettingsAll => {
actions.extend([Action::ChatsSettingsGet, Action::ChatsSettingsUpdate]);
}
Action::WebhooksAll => {
actions.extend([
Action::WebhooksGet,
Action::WebhooksUpdate,
Action::WebhooksDelete,
Action::WebhooksCreate,
]);
}
other => {
actions.insert(*other);
}

View File

@@ -236,9 +236,11 @@ InvalidDocumentFields , InvalidRequest , BAD_REQU
InvalidDocumentRetrieveVectors , InvalidRequest , BAD_REQUEST ;
MissingDocumentFilter , InvalidRequest , BAD_REQUEST ;
MissingDocumentEditionFunction , InvalidRequest , BAD_REQUEST ;
InconsistentDocumentChangeHeaders , InvalidRequest , BAD_REQUEST ;
InvalidDocumentFilter , InvalidRequest , BAD_REQUEST ;
InvalidDocumentSort , InvalidRequest , BAD_REQUEST ;
InvalidDocumentGeoField , InvalidRequest , BAD_REQUEST ;
InvalidHeaderValue , InvalidRequest , BAD_REQUEST ;
InvalidVectorDimensions , InvalidRequest , BAD_REQUEST ;
InvalidVectorsType , InvalidRequest , BAD_REQUEST ;
InvalidDocumentId , InvalidRequest , BAD_REQUEST ;
@@ -267,7 +269,9 @@ InvalidMultiSearchRemote , InvalidRequest , BAD_REQU
InvalidMultiSearchWeight , InvalidRequest , BAD_REQUEST ;
InvalidNetworkRemotes , InvalidRequest , BAD_REQUEST ;
InvalidNetworkSelf , InvalidRequest , BAD_REQUEST ;
InvalidNetworkSharding , InvalidRequest , BAD_REQUEST ;
InvalidNetworkSearchApiKey , InvalidRequest , BAD_REQUEST ;
InvalidNetworkWriteApiKey , InvalidRequest , BAD_REQUEST ;
InvalidNetworkUrl , InvalidRequest , BAD_REQUEST ;
InvalidSearchAttributesToSearchOn , InvalidRequest , BAD_REQUEST ;
InvalidSearchAttributesToCrop , InvalidRequest , BAD_REQUEST ;
@@ -418,16 +422,7 @@ InvalidChatCompletionSearchDescriptionPrompt , InvalidRequest , BAD_REQU
InvalidChatCompletionSearchQueryParamPrompt , InvalidRequest , BAD_REQUEST ;
InvalidChatCompletionSearchFilterParamPrompt , InvalidRequest , BAD_REQUEST ;
InvalidChatCompletionSearchIndexUidParamPrompt , InvalidRequest , BAD_REQUEST ;
InvalidChatCompletionPreQueryPrompt , InvalidRequest , BAD_REQUEST ;
// Webhooks
InvalidWebhooks , InvalidRequest , BAD_REQUEST ;
InvalidWebhookUrl , InvalidRequest , BAD_REQUEST ;
InvalidWebhookHeaders , InvalidRequest , BAD_REQUEST ;
ImmutableWebhook , InvalidRequest , BAD_REQUEST ;
InvalidWebhookUuid , InvalidRequest , BAD_REQUEST ;
WebhookNotFound , InvalidRequest , NOT_FOUND ;
ImmutableWebhookUuid , InvalidRequest , BAD_REQUEST ;
ImmutableWebhookIsEditable , InvalidRequest , BAD_REQUEST
InvalidChatCompletionPreQueryPrompt , InvalidRequest , BAD_REQUEST
}
impl ErrorCode for JoinError {

View File

@@ -1,5 +1,3 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::error::{Code, ResponseError};
@@ -32,23 +30,6 @@ pub struct InstanceTogglableFeatures {
pub contains_filter: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Remote {
pub url: String,
#[serde(default)]
pub search_api_key: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct Network {
#[serde(default, rename = "self")]
pub local: Option<String>,
#[serde(default)]
pub remotes: BTreeMap<String, Remote>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct ChatCompletionSettings {

View File

@@ -365,21 +365,6 @@ pub enum Action {
#[serde(rename = "*.get")]
#[deserr(rename = "*.get")]
AllGet,
#[serde(rename = "webhooks.get")]
#[deserr(rename = "webhooks.get")]
WebhooksGet,
#[serde(rename = "webhooks.update")]
#[deserr(rename = "webhooks.update")]
WebhooksUpdate,
#[serde(rename = "webhooks.delete")]
#[deserr(rename = "webhooks.delete")]
WebhooksDelete,
#[serde(rename = "webhooks.create")]
#[deserr(rename = "webhooks.create")]
WebhooksCreate,
#[serde(rename = "webhooks.*")]
#[deserr(rename = "webhooks.*")]
WebhooksAll,
}
impl Action {
@@ -431,11 +416,6 @@ impl Action {
NETWORK_GET => Some(Self::NetworkGet),
NETWORK_UPDATE => Some(Self::NetworkUpdate),
ALL_GET => Some(Self::AllGet),
WEBHOOKS_GET => Some(Self::WebhooksGet),
WEBHOOKS_UPDATE => Some(Self::WebhooksUpdate),
WEBHOOKS_DELETE => Some(Self::WebhooksDelete),
WEBHOOKS_CREATE => Some(Self::WebhooksCreate),
WEBHOOKS_ALL => Some(Self::WebhooksAll),
_otherwise => None,
}
}
@@ -448,9 +428,7 @@ impl Action {
match self {
// Any action that expands to others must return false, as it wouldn't be able to expand recursively.
All | AllGet | DocumentsAll | IndexesAll | ChatsAll | TasksAll | SettingsAll
| StatsAll | MetricsAll | DumpsAll | SnapshotsAll | ChatsSettingsAll | WebhooksAll => {
false
}
| StatsAll | MetricsAll | DumpsAll | SnapshotsAll | ChatsSettingsAll => false,
Search => true,
DocumentsAdd => false,
@@ -485,10 +463,6 @@ impl Action {
ChatsDelete => false,
ChatsSettingsGet => true,
ChatsSettingsUpdate => false,
WebhooksGet => true,
WebhooksUpdate => false,
WebhooksDelete => false,
WebhooksCreate => false,
}
}
@@ -548,12 +522,6 @@ pub mod actions {
pub const CHATS_SETTINGS_ALL: u8 = ChatsSettingsAll.repr();
pub const CHATS_SETTINGS_GET: u8 = ChatsSettingsGet.repr();
pub const CHATS_SETTINGS_UPDATE: u8 = ChatsSettingsUpdate.repr();
pub const WEBHOOKS_GET: u8 = WebhooksGet.repr();
pub const WEBHOOKS_UPDATE: u8 = WebhooksUpdate.repr();
pub const WEBHOOKS_DELETE: u8 = WebhooksDelete.repr();
pub const WEBHOOKS_CREATE: u8 = WebhooksCreate.repr();
pub const WEBHOOKS_ALL: u8 = WebhooksAll.repr();
}
#[cfg(test)]
@@ -609,11 +577,6 @@ pub(crate) mod test {
assert!(ChatsSettingsGet.repr() == 42 && CHATS_SETTINGS_GET == 42);
assert!(ChatsSettingsUpdate.repr() == 43 && CHATS_SETTINGS_UPDATE == 43);
assert!(AllGet.repr() == 44 && ALL_GET == 44);
assert!(WebhooksGet.repr() == 45 && WEBHOOKS_GET == 45);
assert!(WebhooksUpdate.repr() == 46 && WEBHOOKS_UPDATE == 46);
assert!(WebhooksDelete.repr() == 47 && WEBHOOKS_DELETE == 47);
assert!(WebhooksCreate.repr() == 48 && WEBHOOKS_CREATE == 48);
assert!(WebhooksAll.repr() == 49 && WEBHOOKS_ALL == 49);
}
#[test]

View File

@@ -10,12 +10,12 @@ pub mod index_uid;
pub mod index_uid_pattern;
pub mod keys;
pub mod locales;
pub mod network;
pub mod settings;
pub mod star_or;
pub mod task_view;
pub mod tasks;
pub mod versioning;
pub mod webhooks;
pub use milli::{heed, Index};
use uuid::Uuid;
pub use versioning::VERSION_FILE_NAME;

View File

@@ -0,0 +1,47 @@
// Copyright © 2025 Meilisearch Some Rights Reserved
// This file is part of Meilisearch Enterprise Edition (EE).
// Use of this source code is governed by the Business Source License 1.1,
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
use std::collections::BTreeMap;
use milli::update::new::indexer::sharding::Shards;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "camelCase")]
pub struct Network {
#[serde(default, rename = "self")]
pub local: Option<String>,
#[serde(default)]
pub remotes: BTreeMap<String, Remote>,
#[serde(default)]
pub sharding: bool,
}
impl Network {
pub fn shards(&self) -> Option<Shards> {
if self.sharding {
let this = self.local.as_deref().expect("Inconsistent `sharding` and `self`");
let others = self
.remotes
.keys()
.filter(|name| name.as_str() != this)
.map(|name| name.to_owned())
.collect();
Some(Shards { own: vec![this.to_owned()], others })
} else {
None
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Remote {
pub url: String,
#[serde(default)]
pub search_api_key: Option<String>,
#[serde(default)]
pub write_api_key: Option<String>,
}

View File

@@ -11,6 +11,7 @@ use crate::error::ResponseError;
use crate::settings::{Settings, Unchecked};
use crate::tasks::{
serialize_duration, Details, DetailsExportIndexSettings, IndexSwap, Kind, Status, Task, TaskId,
TaskNetwork,
};
#[derive(Debug, Clone, PartialEq, Serialize, ToSchema)]
@@ -51,6 +52,9 @@ pub struct TaskView {
#[schema(value_type = String, example = json!("2024-08-08_14:12:09.393Z"))]
#[serde(with = "time::serde::rfc3339::option", default)]
pub finished_at: Option<OffsetDateTime>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
}
impl TaskView {
@@ -68,6 +72,7 @@ impl TaskView {
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
network: task.network.clone(),
}
}
}

View File

@@ -42,6 +42,9 @@ pub struct Task {
pub status: Status,
pub kind: KindWithContent,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub network: Option<TaskNetwork>,
}
impl Task {
@@ -704,6 +707,36 @@ pub enum Details {
},
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
#[serde(untagged, rename_all = "camelCase")]
pub enum TaskNetwork {
Origin { origin: Origin },
Remotes { remote_tasks: BTreeMap<String, RemoteTask> },
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
#[serde(untagged, rename_all = "camelCase")]
pub struct Origin {
pub remote_name: String,
pub task_uid: usize,
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct RemoteTask {
#[serde(skip_serializing_if = "Option::is_none")]
task_uid: Option<TaskId>,
error: Option<ResponseError>,
}
impl From<Result<TaskId, ResponseError>> for RemoteTask {
fn from(res: Result<TaskId, ResponseError>) -> RemoteTask {
match res {
Ok(task_uid) => RemoteTask { task_uid: Some(task_uid), error: None },
Err(err) => RemoteTask { task_uid: None, error: Some(err) },
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, ToSchema)]
#[schema(rename_all = "camelCase")]
pub struct DetailsExportIndexSettings {

View File

@@ -1,28 +0,0 @@
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Webhook {
pub url: String,
#[serde(default)]
pub headers: BTreeMap<String, String>,
}
#[derive(Debug, Serialize, Default, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct WebhooksView {
#[serde(default)]
pub webhooks: BTreeMap<Uuid, Webhook>,
}
// Same as the WebhooksView instead it should never contains the CLI webhooks.
// It's the right structure to use in the dump
#[derive(Debug, Deserialize, Serialize, Default, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct WebhooksDumpView {
#[serde(default)]
pub webhooks: BTreeMap<Uuid, Webhook>,
}

View File

@@ -115,6 +115,9 @@ utoipa-scalar = { version = "0.3.0", optional = true, features = ["actix-web"] }
async-openai = { git = "https://github.com/meilisearch/async-openai", branch = "better-error-handling" }
secrecy = "0.10.3"
actix-web-lab = { version = "0.24.1", default-features = false }
urlencoding = "2.1.3"
backoff = { version = "0.4.0", features = ["tokio"] }
[dev-dependencies]
actix-rt = "2.10.0"
@@ -125,7 +128,6 @@ manifest-dir-macros = "0.1.18"
maplit = "1.0.2"
meili-snap = { path = "../meili-snap" }
temp-env = "0.3.6"
urlencoding = "2.1.3"
wiremock = "0.6.3"
yaup = "0.3.1"
@@ -170,5 +172,5 @@ german = ["meilisearch-types/german"]
turkish = ["meilisearch-types/turkish"]
[package.metadata.mini-dashboard]
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.22/build.zip"
sha1 = "b70b2036b5f167da9ea0b637da8b320c7ea88254"
assets-url = "https://github.com/meilisearch/mini-dashboard/releases/download/v0.2.20/build.zip"
sha1 = "82a7ddd7bf14bb5323c3d235d2b62892a98b6a59"

View File

@@ -9,6 +9,8 @@ use meilisearch_types::milli::OrderBy;
use serde_json::Value;
use tokio::task::JoinError;
use crate::routes::indexes::{PROXY_ORIGIN_REMOTE_HEADER, PROXY_ORIGIN_TASK_UID_HEADER};
#[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError {
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
@@ -80,6 +82,16 @@ pub enum MeilisearchHttpError {
MissingSearchHybrid,
#[error("Invalid request: both `media` and `vector` parameters are present.")]
MediaAndVector,
#[error("Inconsistent `Origin` headers: {} was provided but {} is missing.\n - Hint: Either both headers should be provided, or none of them", if *is_remote_missing {
PROXY_ORIGIN_TASK_UID_HEADER
} else { PROXY_ORIGIN_REMOTE_HEADER },
if *is_remote_missing {
PROXY_ORIGIN_REMOTE_HEADER
} else { PROXY_ORIGIN_TASK_UID_HEADER }
)]
InconsistentOriginHeaders { is_remote_missing: bool },
#[error("Invalid value for header {header_name}: {msg}")]
InvalidHeaderValue { header_name: &'static str, msg: String },
}
impl MeilisearchHttpError {
@@ -124,6 +136,10 @@ impl ErrorCode for MeilisearchHttpError {
MeilisearchHttpError::InconsistentFacetOrder { .. } => {
Code::InvalidMultiSearchFacetOrder
}
MeilisearchHttpError::InconsistentOriginHeaders { .. } => {
Code::InconsistentDocumentChangeHeaders
}
MeilisearchHttpError::InvalidHeaderValue { .. } => Code::InvalidHeaderValue,
}
}
}

View File

@@ -223,8 +223,8 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
cli_webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()),
cli_webhook_authorization: opt.task_webhook_authorization_header.clone(),
webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()),
webhook_authorization_header: opt.task_webhook_authorization_header.clone(),
task_db_size: opt.max_task_db_size.as_u64() as usize,
index_base_map_size: opt.max_index_size.as_u64() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
@@ -491,12 +491,7 @@ fn import_dump(
let _ = std::fs::write(db_path.join("instance-uid"), instance_uid.to_string().as_bytes());
};
// 2. Import the webhooks
if let Some(webhooks) = dump_reader.webhooks() {
index_scheduler.update_runtime_webhooks(webhooks.webhooks.clone())?;
}
// 3. Import the `Key`s.
// 2. Import the `Key`s.
let mut keys = Vec::new();
auth.raw_delete_all_keys()?;
for key in dump_reader.keys()? {
@@ -505,20 +500,20 @@ fn import_dump(
keys.push(key);
}
// 4. Import the `ChatCompletionSettings`s.
// 3. Import the `ChatCompletionSettings`s.
for result in dump_reader.chat_completions_settings()? {
let (name, settings) = result?;
index_scheduler.put_chat_settings(&name, &settings)?;
}
// 5. Import the runtime features and network
// 4. Import the runtime features and network
let features = dump_reader.features()?.unwrap_or_default();
index_scheduler.put_runtime_features(features)?;
let network = dump_reader.network()?.cloned().unwrap_or_default();
index_scheduler.put_network(network)?;
// 5.1 Use all cpus to process dump if `max_indexing_threads` not configured
// 4.1 Use all cpus to process dump if `max_indexing_threads` not configured
let backup_config;
let base_config = index_scheduler.indexer_config();
@@ -535,7 +530,7 @@ fn import_dump(
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
// try to process tasks while we're trying to import the indexes.
// 6. Import the indexes.
// 5. Import the indexes.
for index_reader in dump_reader.indexes()? {
let mut index_reader = index_reader?;
let metadata = index_reader.metadata();
@@ -548,12 +543,12 @@ fn import_dump(
let mut wtxn = index.write_txn()?;
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
// 6.1 Import the primary key if there is one.
// 5.1 Import the primary key if there is one.
if let Some(ref primary_key) = metadata.primary_key {
builder.set_primary_key(primary_key.to_string());
}
// 6.2 Import the settings.
// 5.2 Import the settings.
tracing::info!("Importing the settings.");
let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder);
@@ -565,8 +560,8 @@ fn import_dump(
let rtxn = index.read_txn()?;
if index_scheduler.no_edition_2024_for_dumps() {
// 6.3 Import the documents.
// 6.3.1 We need to recreate the grenad+obkv format accepted by the index.
// 5.3 Import the documents.
// 5.3.1 We need to recreate the grenad+obkv format accepted by the index.
tracing::info!("Importing the documents.");
let file = tempfile::tempfile()?;
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
@@ -577,7 +572,7 @@ fn import_dump(
// This flush the content of the batch builder.
let file = builder.into_inner()?.into_inner()?;
// 6.3.2 We feed it to the milli index.
// 5.3.2 We feed it to the milli index.
let reader = BufReader::new(file);
let reader = DocumentsBatchReader::from_reader(reader)?;
@@ -628,6 +623,7 @@ fn import_dump(
&mut new_fields_ids_map,
&|| false, // never stop processing a dump
progress.clone(),
None,
)?;
let operation_stats = operation_stats.pop().unwrap();
@@ -656,15 +652,15 @@ fn import_dump(
index_scheduler.refresh_index_stats(&uid)?;
}
// 7. Import the queue
// 6. Import the queue
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
// 7.1. Import the batches
// 6.1. Import the batches
for ret in dump_reader.batches()? {
let batch = ret?;
index_scheduler_dump.register_dumped_batch(batch)?;
}
// 7.2. Import the tasks
// 6.2. Import the tasks
for ret in dump_reader.tasks()? {
let (task, file) = ret?;
index_scheduler_dump.register_dumped_task(task, file)?;

View File

@@ -15,33 +15,30 @@ lazy_static! {
"Meilisearch number of degraded search requests"
))
.expect("Can't create a metric");
pub static ref MEILISEARCH_CHAT_SEARCHES_TOTAL: IntCounterVec = register_int_counter_vec!(
pub static ref MEILISEARCH_CHAT_SEARCH_REQUESTS: IntCounterVec = register_int_counter_vec!(
opts!(
"meilisearch_chat_searches_total",
"Total number of searches performed by the chat route"
"meilisearch_chat_search_requests",
"Meilisearch number of search requests performed by the chat route itself"
),
&["type"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_CHAT_PROMPT_TOKENS_TOTAL: IntCounterVec = register_int_counter_vec!(
opts!("meilisearch_chat_prompt_tokens_total", "Total number of prompt tokens consumed"),
pub static ref MEILISEARCH_CHAT_PROMPT_TOKENS_USAGE: IntCounterVec = register_int_counter_vec!(
opts!("meilisearch_chat_prompt_tokens_usage", "Meilisearch Chat Prompt Tokens Usage"),
&["workspace", "model"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_CHAT_COMPLETION_TOKENS_TOTAL: IntCounterVec =
pub static ref MEILISEARCH_CHAT_COMPLETION_TOKENS_USAGE: IntCounterVec =
register_int_counter_vec!(
opts!(
"meilisearch_chat_completion_tokens_total",
"Total number of completion tokens consumed"
"meilisearch_chat_completion_tokens_usage",
"Meilisearch Chat Completion Tokens Usage"
),
&["workspace", "model"]
)
.expect("Can't create a metric");
pub static ref MEILISEARCH_CHAT_TOKENS_TOTAL: IntCounterVec = register_int_counter_vec!(
opts!(
"meilisearch_chat_tokens_total",
"Total number of tokens consumed (prompt + completion)"
),
pub static ref MEILISEARCH_CHAT_TOTAL_TOKENS_USAGE: IntCounterVec = register_int_counter_vec!(
opts!("meilisearch_chat_total_tokens_usage", "Meilisearch Chat Total Tokens Usage"),
&["workspace", "model"]
)
.expect("Can't create a metric");

View File

@@ -206,13 +206,11 @@ pub struct Opt {
pub env: String,
/// Called whenever a task finishes so a third party can be notified.
/// See also the dedicated API `/webhooks`.
#[clap(long, env = MEILI_TASK_WEBHOOK_URL)]
pub task_webhook_url: Option<Url>,
/// The Authorization header to send on the webhook URL whenever
/// a task finishes so a third party can be notified.
/// See also the dedicated API `/webhooks`.
#[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)]
pub task_webhook_authorization_header: Option<String>,

View File

@@ -50,8 +50,8 @@ use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _};
use crate::metrics::{
MEILISEARCH_CHAT_COMPLETION_TOKENS_TOTAL, MEILISEARCH_CHAT_PROMPT_TOKENS_TOTAL,
MEILISEARCH_CHAT_SEARCHES_TOTAL, MEILISEARCH_CHAT_TOKENS_TOTAL,
MEILISEARCH_CHAT_COMPLETION_TOKENS_USAGE, MEILISEARCH_CHAT_PROMPT_TOKENS_USAGE,
MEILISEARCH_CHAT_SEARCH_REQUESTS, MEILISEARCH_CHAT_TOTAL_TOKENS_USAGE,
MEILISEARCH_DEGRADED_SEARCH_REQUESTS,
};
use crate::routes::chats::utils::SseEventSender;
@@ -319,7 +319,7 @@ async fn process_search_request(
};
let mut documents = Vec::new();
if let Ok((ref rtxn, ref search_result)) = output {
MEILISEARCH_CHAT_SEARCHES_TOTAL.with_label_values(&["internal"]).inc();
MEILISEARCH_CHAT_SEARCH_REQUESTS.with_label_values(&["internal"]).inc();
if search_result.degraded {
MEILISEARCH_DEGRADED_SEARCH_REQUESTS.inc();
}
@@ -596,13 +596,13 @@ async fn run_conversation<C: async_openai::config::Config>(
match result {
Ok(resp) => {
if let Some(usage) = resp.usage.as_ref() {
MEILISEARCH_CHAT_PROMPT_TOKENS_TOTAL
MEILISEARCH_CHAT_PROMPT_TOKENS_USAGE
.with_label_values(&[workspace_uid, &chat_completion.model])
.inc_by(usage.prompt_tokens as u64);
MEILISEARCH_CHAT_COMPLETION_TOKENS_TOTAL
MEILISEARCH_CHAT_COMPLETION_TOKENS_USAGE
.with_label_values(&[workspace_uid, &chat_completion.model])
.inc_by(usage.completion_tokens as u64);
MEILISEARCH_CHAT_TOKENS_TOTAL
MEILISEARCH_CHAT_TOTAL_TOKENS_USAGE
.with_label_values(&[workspace_uid, &chat_completion.model])
.inc_by(usage.total_tokens as u64);
}

View File

@@ -45,6 +45,7 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::indexes::proxy::{proxy, Body};
use crate::routes::indexes::search::fix_sort_query_parameters;
use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
@@ -334,6 +335,7 @@ pub async fn delete_document(
) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner();
let index_uid = IndexUid::try_from(index_uid)?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@@ -351,10 +353,16 @@ pub async fn delete_document(
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task))
}
@@ -792,7 +800,6 @@ pub async fn replace_documents(
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
index_uid,
params.primary_key,
@@ -802,8 +809,10 @@ pub async fn replace_documents(
uid,
dry_run,
allow_index_creation,
&req,
)
.await?;
debug!(returns = ?task, "Replace documents");
Ok(HttpResponse::Accepted().json(task))
@@ -893,7 +902,6 @@ pub async fn update_documents(
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition(
extract_mime_type(&req)?,
index_scheduler,
index_uid,
params.primary_key,
@@ -903,6 +911,7 @@ pub async fn update_documents(
uid,
dry_run,
allow_index_creation,
&req,
)
.await?;
debug!(returns = ?task, "Update documents");
@@ -912,7 +921,6 @@ pub async fn update_documents(
#[allow(clippy::too_many_arguments)]
async fn document_addition(
mime_type: Option<Mime>,
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: IndexUid,
primary_key: Option<String>,
@@ -922,7 +930,11 @@ async fn document_addition(
task_id: Option<TaskId>,
dry_run: bool,
allow_index_creation: bool,
req: &HttpRequest,
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let mime_type = extract_mime_type(req)?;
let network = index_scheduler.network();
let format = match (
mime_type.as_ref().map(|m| (m.type_().as_str(), m.subtype().as_str())),
csv_delimiter,
@@ -954,7 +966,7 @@ async fn document_addition(
};
let (uuid, mut update_file) = index_scheduler.queue.create_update_file(dry_run)?;
let documents_count = match format {
let res = match format {
PayloadType::Ndjson => {
let (path, file) = update_file.into_parts();
let file = match file {
@@ -969,19 +981,19 @@ async fn document_addition(
None => None,
};
let documents_count = tokio::task::spawn_blocking(move || {
let res = tokio::task::spawn_blocking(move || {
let documents_count = file.as_ref().map_or(Ok(0), |ntf| {
read_ndjson(ntf.as_file()).map_err(MeilisearchHttpError::DocumentFormat)
})?;
let update_file = file_store::File::from_parts(path, file);
update_file.persist()?;
let update_file = update_file.persist()?;
Ok(documents_count)
Ok((documents_count, update_file))
})
.await?;
Ok(documents_count)
Ok(res)
}
PayloadType::Json | PayloadType::Csv { delimiter: _ } => {
let temp_file = match tempfile() {
@@ -1000,16 +1012,16 @@ async fn document_addition(
unreachable!("We already wrote the user content into the update file")
}
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?;
Ok(documents_count)
// we NEED to persist the file here because we moved the `update_file` in another task.
let file = update_file.persist()?;
Ok((documents_count, file))
})
.await
}
};
let documents_count = match documents_count {
Ok(Ok(documents_count)) => documents_count,
let (documents_count, file) = match res {
Ok(Ok((documents_count, file))) => (documents_count, file),
// in this case the file has not possibly be persisted.
Ok(Err(e)) => return Err(e),
Err(e) => {
@@ -1051,6 +1063,20 @@ async fn document_addition(
}
};
if network.sharding {
if let Some(file) = file {
proxy(
&index_scheduler,
&index_uid,
req,
network,
Body::with_ndjson_payload(file),
&task,
)
.await?;
}
}
Ok(task.into())
}
@@ -1129,6 +1155,7 @@ pub async fn delete_documents_batch(
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@@ -1149,16 +1176,22 @@ pub async fn delete_documents_batch(
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(body), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task))
}
#[derive(Debug, Deserr, ToSchema)]
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
#[schema(rename_all = "camelCase")]
pub struct DocumentDeletionByFilter {
@@ -1207,7 +1240,8 @@ pub async fn delete_documents_by_filter(
debug!(parameters = ?body, "Delete documents by filter");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let filter = body.into_inner().filter;
let filter = body.into_inner();
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
@@ -1220,23 +1254,36 @@ pub async fn delete_documents_by_filter(
);
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())?
.ok_or(MeilisearchHttpError::EmptyFilter)?;
crate::search::parse_filter(
&filter.filter,
Code::InvalidDocumentFilter,
index_scheduler.features(),
)?
.ok_or(MeilisearchHttpError::EmptyFilter)?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let task = KindWithContent::DocumentDeletionByFilter {
index_uid: index_uid.clone(),
filter_expr: filter.filter.clone(),
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(filter), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task))
}
#[derive(Debug, Deserr, ToSchema)]
#[derive(Debug, Deserr, ToSchema, Serialize)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct DocumentEditionByFunction {
/// A string containing a RHAI function.
@@ -1324,6 +1371,8 @@ pub async fn edit_documents_by_function(
.features()
.check_edit_documents_by_function("Using the documents edit route")?;
let network = index_scheduler.network();
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let params = params.into_inner();
@@ -1337,13 +1386,12 @@ pub async fn edit_documents_by_function(
&req,
);
let DocumentEditionByFunction { filter, context, function } = params;
let engine = milli::rhai::Engine::new();
if let Err(e) = engine.compile(&function) {
if let Err(e) = engine.compile(&params.function) {
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));
}
if let Some(ref filter) = filter {
if let Some(ref filter) = params.filter {
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(
filter,
@@ -1353,9 +1401,9 @@ pub async fn edit_documents_by_function(
.ok_or(MeilisearchHttpError::EmptyFilter)?;
}
let task = KindWithContent::DocumentEdition {
index_uid,
filter_expr: filter,
context: match context {
index_uid: index_uid.clone(),
filter_expr: params.filter.clone(),
context: match params.context.clone() {
Some(Value::Object(m)) => Some(m),
None => None,
_ => {
@@ -1365,15 +1413,21 @@ pub async fn edit_documents_by_function(
))
}
},
function,
function: params.function.clone(),
};
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::Inline(params), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Edit documents by function");
Ok(HttpResponse::Accepted().json(task))
@@ -1416,6 +1470,8 @@ pub async fn clear_all_documents(
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let network = index_scheduler.network();
analytics.publish(
DocumentsDeletionAggregator {
clear_all: true,
@@ -1429,10 +1485,18 @@ pub async fn clear_all_documents(
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
let task = {
let index_scheduler = index_scheduler.clone();
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run)).await??
};
if network.sharding && !dry_run {
proxy(&index_scheduler, &index_uid, &req, network, Body::none(), &task).await?;
}
let task: SummarizedTaskView = task.into();
debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task))

View File

@@ -30,6 +30,7 @@ use crate::Opt;
pub mod documents;
pub mod facet_search;
mod proxy;
pub mod search;
mod search_analytics;
#[cfg(test)]
@@ -39,6 +40,8 @@ mod settings_analytics;
pub mod similar;
mod similar_analytics;
pub use proxy::{PROXY_ORIGIN_REMOTE_HEADER, PROXY_ORIGIN_TASK_UID_HEADER};
#[derive(OpenApi)]
#[openapi(
nest(

View File

@@ -0,0 +1,424 @@
// Copyright © 2025 Meilisearch Some Rights Reserved
// This file is part of Meilisearch Enterprise Edition (EE).
// Use of this source code is governed by the Business Source License 1.1,
// as found in the LICENSE-EE file or at <https://mariadb.com/bsl11>
use std::collections::BTreeMap;
use std::fs::File;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::HttpRequest;
use bytes::Bytes;
use index_scheduler::IndexScheduler;
use meilisearch_types::error::ResponseError;
use meilisearch_types::tasks::{Origin, RemoteTask, TaskNetwork};
use reqwest::StatusCode;
use serde::de::DeserializeOwned;
use serde_json::Value;
use crate::error::MeilisearchHttpError;
use crate::routes::indexes::proxy::error::{ProxyDocumentChangeError, ReqwestErrorWithoutUrl};
use crate::routes::SummarizedTaskView;
pub enum Body<T: serde::Serialize> {
NdJsonPayload(File),
Inline(T),
None,
}
impl Body<()> {
pub fn with_ndjson_payload(file: File) -> Self {
Self::NdJsonPayload(file)
}
pub fn none() -> Self {
Self::None
}
}
/// If necessary, proxies the passed request to the network and update the task description.
///
/// This function reads the custom headers from the request to determine if must proxy the request or if the request
/// has already been proxied.
///
/// - when it must proxy the request, the endpoint, method and query params are retrieved from the passed `req`, then the `body` is
/// sent to all remotes of the `network` (except `self`). The response from the remotes are collected to update the passed `task`
/// with the task ids from the task queues of the remotes.
/// - when the request has already been proxied, the custom headers contains information about the remote that created the initial task.
/// This information is copied to the passed task.
pub async fn proxy<T: serde::Serialize>(
index_scheduler: &IndexScheduler,
index_uid: &str,
req: &HttpRequest,
network: meilisearch_types::network::Network,
body: Body<T>,
task: &meilisearch_types::tasks::Task,
) -> Result<(), MeilisearchHttpError> {
match origin_from_req(req)? {
Some(origin) => {
index_scheduler.set_task_network(task.uid, TaskNetwork::Origin { origin })?
}
None => {
let this = network
.local
.as_deref()
.expect("inconsistent `network.sharding` and `network.self`")
.to_owned();
let content_type = match &body {
// for file bodies, force x-ndjson
Body::NdJsonPayload(_) => Some(b"application/x-ndjson".as_slice()),
// otherwise get content type from request
_ => req.headers().get(CONTENT_TYPE).map(|h| h.as_bytes()),
};
let body = match body {
Body::NdJsonPayload(file) => Some(Bytes::from_owner(unsafe {
memmap2::Mmap::map(&file).map_err(|err| {
MeilisearchHttpError::from_milli(err.into(), Some(index_uid.to_owned()))
})?
})),
Body::Inline(payload) => {
Some(Bytes::copy_from_slice(&serde_json::to_vec(&payload).unwrap()))
}
Body::None => None,
};
let mut in_flight_remote_queries = BTreeMap::new();
let client = reqwest::ClientBuilder::new()
.connect_timeout(std::time::Duration::from_secs(3))
.build()
.unwrap();
let method = from_old_http_method(req.method());
// send payload to all remotes
for (node_name, node) in
network.remotes.into_iter().filter(|(name, _)| name.as_str() != this)
{
let body = body.clone();
let client = client.clone();
let api_key = node.write_api_key;
let this = this.clone();
let method = method.clone();
let path_and_query =
req.uri().path_and_query().map(|paq| paq.as_str()).unwrap_or("/");
in_flight_remote_queries.insert(
node_name,
tokio::spawn({
let url = format!("{}{}", node.url, path_and_query);
let url_encoded_this = urlencoding::encode(&this).into_owned();
let url_encoded_task_uid = task.uid.to_string(); // it's url encoded i promize
let content_type = content_type.map(|b| b.to_owned());
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_max_elapsed_time(Some(std::time::Duration::from_secs(25)))
.build();
backoff::future::retry(backoff, move || {
let url = url.clone();
let client = client.clone();
let url_encoded_this = url_encoded_this.clone();
let url_encoded_task_uid = url_encoded_task_uid.clone();
let content_type = content_type.clone();
let body = body.clone();
let api_key = api_key.clone();
let method = method.clone();
async move {
try_proxy(
method,
&url,
content_type.as_deref(),
api_key.as_deref(),
&client,
&url_encoded_this,
&url_encoded_task_uid,
body,
)
.await
}
})
}),
);
}
// wait for all in-flight queries to finish and collect their results
let mut remote_tasks: BTreeMap<String, RemoteTask> = BTreeMap::new();
for (node_name, handle) in in_flight_remote_queries {
match handle.await {
Ok(Ok(res)) => {
let task_uid = res.task_uid;
remote_tasks.insert(node_name, Ok(task_uid).into());
}
Ok(Err(error)) => {
remote_tasks.insert(node_name, Err(error.as_response_error()).into());
}
Err(panic) => match panic.try_into_panic() {
Ok(panic) => {
let msg = match panic.downcast_ref::<&'static str>() {
Some(s) => *s,
None => match panic.downcast_ref::<String>() {
Some(s) => &s[..],
None => "Box<dyn Any>",
},
};
remote_tasks.insert(
node_name,
Err(ResponseError::from_msg(
msg.to_string(),
meilisearch_types::error::Code::Internal,
))
.into(),
);
}
Err(_) => {
tracing::error!("proxy task was unexpectedly cancelled")
}
},
}
}
// edit details to contain the return values from the remotes
index_scheduler.set_task_network(task.uid, TaskNetwork::Remotes { remote_tasks })?;
}
}
Ok(())
}
fn from_old_http_method(method: &actix_http::Method) -> reqwest::Method {
match method {
&actix_http::Method::CONNECT => reqwest::Method::CONNECT,
&actix_http::Method::DELETE => reqwest::Method::DELETE,
&actix_http::Method::GET => reqwest::Method::GET,
&actix_http::Method::HEAD => reqwest::Method::HEAD,
&actix_http::Method::OPTIONS => reqwest::Method::OPTIONS,
&actix_http::Method::PATCH => reqwest::Method::PATCH,
&actix_http::Method::POST => reqwest::Method::POST,
&actix_http::Method::PUT => reqwest::Method::PUT,
&actix_http::Method::TRACE => reqwest::Method::TRACE,
method => reqwest::Method::from_bytes(method.as_str().as_bytes()).unwrap(),
}
}
#[allow(clippy::too_many_arguments)]
async fn try_proxy(
method: reqwest::Method,
url: &str,
content_type: Option<&[u8]>,
api_key: Option<&str>,
client: &reqwest::Client,
url_encoded_this: &str,
url_encoded_task_uid: &str,
body: Option<Bytes>,
) -> Result<SummarizedTaskView, backoff::Error<ProxyDocumentChangeError>> {
let request = client.request(method, url).timeout(std::time::Duration::from_secs(30));
let request = if let Some(body) = body { request.body(body) } else { request };
let request = if let Some(api_key) = api_key { request.bearer_auth(api_key) } else { request };
let request = request.header(PROXY_ORIGIN_TASK_UID_HEADER, url_encoded_task_uid);
let request = request.header(PROXY_ORIGIN_REMOTE_HEADER, url_encoded_this);
let request = if let Some(content_type) = content_type {
request.header(CONTENT_TYPE.as_str(), content_type)
} else {
request
};
let response = request.send().await;
let response = match response {
Ok(response) => response,
Err(error) if error.is_timeout() => {
return Err(backoff::Error::transient(ProxyDocumentChangeError::Timeout))
}
Err(error) => {
return Err(backoff::Error::transient(ProxyDocumentChangeError::CouldNotSendRequest(
ReqwestErrorWithoutUrl::new(error),
)))
}
};
match response.status() {
status_code if status_code.is_success() => (),
StatusCode::UNAUTHORIZED | StatusCode::FORBIDDEN => {
return Err(backoff::Error::Permanent(ProxyDocumentChangeError::AuthenticationError))
}
status_code if status_code.is_client_error() => {
let response = parse_error(response).await;
return Err(backoff::Error::Permanent(ProxyDocumentChangeError::BadRequest {
status_code,
response,
}));
}
status_code if status_code.is_server_error() => {
let response = parse_error(response).await;
return Err(backoff::Error::transient(ProxyDocumentChangeError::RemoteError {
status_code,
response,
}));
}
status_code => {
tracing::warn!(
status_code = status_code.as_u16(),
"remote replied with unexpected status code"
);
}
}
let response = match parse_response(response).await {
Ok(response) => response,
Err(response) => {
return Err(backoff::Error::transient(
ProxyDocumentChangeError::CouldNotParseResponse { response },
))
}
};
Ok(response)
}
async fn parse_error(response: reqwest::Response) -> Result<String, ReqwestErrorWithoutUrl> {
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => return Err(ReqwestErrorWithoutUrl::new(error)),
};
Ok(parse_bytes_as_error(&bytes))
}
fn parse_bytes_as_error(bytes: &[u8]) -> String {
match serde_json::from_slice::<Value>(bytes) {
Ok(value) => value.to_string(),
Err(_) => String::from_utf8_lossy(bytes).into_owned(),
}
}
async fn parse_response<T: DeserializeOwned>(
response: reqwest::Response,
) -> Result<T, Result<String, ReqwestErrorWithoutUrl>> {
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(error) => return Err(Err(ReqwestErrorWithoutUrl::new(error))),
};
match serde_json::from_slice::<T>(&bytes) {
Ok(value) => Ok(value),
Err(_) => Err(Ok(parse_bytes_as_error(&bytes))),
}
}
mod error {
use meilisearch_types::error::ResponseError;
use reqwest::StatusCode;
#[derive(Debug, thiserror::Error)]
pub enum ProxyDocumentChangeError {
#[error("{0}")]
CouldNotSendRequest(ReqwestErrorWithoutUrl),
#[error("could not authenticate against the remote host\n - hint: check that the remote instance was registered with a valid API key having the `documents.add` action")]
AuthenticationError,
#[error(
"could not parse response from the remote host as a document addition response{}\n - hint: check that the remote instance is a Meilisearch instance running the same version",
response_from_remote(response)
)]
CouldNotParseResponse { response: Result<String, ReqwestErrorWithoutUrl> },
#[error("remote host responded with code {}{}\n - hint: check that the remote instance has the correct index configuration for that request\n - hint: check that the `network` experimental feature is enabled on the remote instance", status_code.as_u16(), response_from_remote(response))]
BadRequest { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
#[error("remote host did not answer before the deadline")]
Timeout,
#[error("remote host responded with code {}{}", status_code.as_u16(), response_from_remote(response))]
RemoteError { status_code: StatusCode, response: Result<String, ReqwestErrorWithoutUrl> },
}
impl ProxyDocumentChangeError {
pub fn as_response_error(&self) -> ResponseError {
use meilisearch_types::error::Code;
let message = self.to_string();
let code = match self {
ProxyDocumentChangeError::CouldNotSendRequest(_) => Code::RemoteCouldNotSendRequest,
ProxyDocumentChangeError::AuthenticationError => Code::RemoteInvalidApiKey,
ProxyDocumentChangeError::BadRequest { .. } => Code::RemoteBadRequest,
ProxyDocumentChangeError::Timeout => Code::RemoteTimeout,
ProxyDocumentChangeError::RemoteError { .. } => Code::RemoteRemoteError,
ProxyDocumentChangeError::CouldNotParseResponse { .. } => Code::RemoteBadResponse,
};
ResponseError::from_msg(message, code)
}
}
#[derive(Debug, thiserror::Error)]
#[error(transparent)]
pub struct ReqwestErrorWithoutUrl(reqwest::Error);
impl ReqwestErrorWithoutUrl {
pub fn new(inner: reqwest::Error) -> Self {
Self(inner.without_url())
}
}
fn response_from_remote(response: &Result<String, ReqwestErrorWithoutUrl>) -> String {
match response {
Ok(response) => {
format!(":\n - response from remote: {}", response)
}
Err(error) => {
format!(":\n - additionally, could not retrieve response from remote: {error}")
}
}
}
}
pub const PROXY_ORIGIN_REMOTE_HEADER: &str = "Meili-Proxy-Origin-Remote";
pub const PROXY_ORIGIN_TASK_UID_HEADER: &str = "Meili-Proxy-Origin-TaskUid";
pub fn origin_from_req(req: &HttpRequest) -> Result<Option<Origin>, MeilisearchHttpError> {
let (remote_name, task_uid) = match (
req.headers().get(PROXY_ORIGIN_REMOTE_HEADER),
req.headers().get(PROXY_ORIGIN_TASK_UID_HEADER),
) {
(None, None) => return Ok(None),
(None, Some(_)) => {
return Err(MeilisearchHttpError::InconsistentOriginHeaders { is_remote_missing: true })
}
(Some(_), None) => {
return Err(MeilisearchHttpError::InconsistentOriginHeaders {
is_remote_missing: false,
})
}
(Some(remote_name), Some(task_uid)) => (
urlencoding::decode(remote_name.to_str().map_err(|err| {
MeilisearchHttpError::InvalidHeaderValue {
header_name: PROXY_ORIGIN_REMOTE_HEADER,
msg: format!("while parsing remote name as UTF-8: {err}"),
}
})?)
.map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
header_name: PROXY_ORIGIN_REMOTE_HEADER,
msg: format!("while URL-decoding remote name: {err}"),
})?,
urlencoding::decode(task_uid.to_str().map_err(|err| {
MeilisearchHttpError::InvalidHeaderValue {
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
msg: format!("while parsing task UID as UTF-8: {err}"),
}
})?)
.map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
msg: format!("while URL-decoding task UID: {err}"),
})?,
),
};
let task_uid: usize =
task_uid.parse().map_err(|err| MeilisearchHttpError::InvalidHeaderValue {
header_name: PROXY_ORIGIN_TASK_UID_HEADER,
msg: format!("while parsing the task UID as an integer: {err}"),
})?;
Ok(Some(Origin { remote_name: remote_name.into_owned(), task_uid }))
}

View File

@@ -511,7 +511,7 @@ make_setting_routes!(
},
{
route: "/chat",
update_verb: patch,
update_verb: put,
value_type: ChatSettings,
err_type: meilisearch_types::deserr::DeserrJsonError<
meilisearch_types::error::deserr_codes::InvalidSettingsIndexChat,

View File

@@ -41,7 +41,6 @@ use crate::routes::indexes::IndexView;
use crate::routes::multi_search::SearchResults;
use crate::routes::network::{Network, Remote};
use crate::routes::swap_indexes::SwapIndexesPayload;
use crate::routes::webhooks::{WebhookResults, WebhookSettings, WebhookWithMetadata};
use crate::search::{
FederatedSearch, FederatedSearchResult, Federation, FederationOptions, MergeFacets,
SearchQueryWithIndex, SearchResultWithIndex, SimilarQuery, SimilarResult,
@@ -71,7 +70,6 @@ mod swap_indexes;
pub mod tasks;
#[cfg(test)]
mod tasks_test;
mod webhooks;
#[derive(OpenApi)]
#[openapi(
@@ -91,7 +89,6 @@ mod webhooks;
(path = "/experimental-features", api = features::ExperimentalFeaturesApi),
(path = "/export", api = export::ExportApi),
(path = "/network", api = network::NetworkApi),
(path = "/webhooks", api = webhooks::WebhooksApi),
),
paths(get_health, get_version, get_stats),
tags(
@@ -102,7 +99,7 @@ mod webhooks;
url = "/",
description = "Local server",
)),
components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures, Export, WebhookSettings, WebhookResults, WebhookWithMetadata))
components(schemas(PaginationView<KeyView>, PaginationView<IndexView>, IndexView, DocumentDeletionByFilter, AllBatches, BatchStats, ProgressStepView, ProgressView, BatchView, RuntimeTogglableFeatures, SwapIndexesPayload, DocumentEditionByFunction, MergeFacets, FederationOptions, SearchQueryWithIndex, Federation, FederatedSearch, FederatedSearchResult, SearchResults, SearchResultWithIndex, SimilarQuery, SimilarResult, PaginationView<serde_json::Value>, BrowseQuery, UpdateIndexRequest, IndexUid, IndexCreateRequest, KeyView, Action, CreateApiKey, UpdateStderrLogs, LogMode, GetLogs, IndexStats, Stats, HealthStatus, HealthResponse, VersionResponse, Code, ErrorType, AllTasks, TaskView, Status, DetailsView, ResponseError, Settings<Unchecked>, Settings<Checked>, TypoSettings, MinWordSizeTyposSetting, FacetingSettings, PaginationSettings, SummarizedTaskView, Kind, Network, Remote, FilterableAttributesRule, FilterableAttributesPatterns, AttributePatterns, FilterableAttributesFeatures, FilterFeatures, Export))
)]
pub struct MeilisearchApi;
@@ -123,8 +120,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/experimental-features").configure(features::configure))
.service(web::scope("/network").configure(network::configure))
.service(web::scope("/export").configure(export::configure))
.service(web::scope("/chats").configure(chats::configure))
.service(web::scope("/webhooks").configure(webhooks::configure));
.service(web::scope("/chats").configure(chats::configure));
#[cfg(feature = "swagger")]
{
@@ -184,7 +180,7 @@ pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
.is_some_and(|s| s.to_lowercase() == "true"))
}
#[derive(Debug, Serialize, ToSchema)]
#[derive(Debug, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView {
/// The task unique identifier.
@@ -198,7 +194,10 @@ pub struct SummarizedTaskView {
#[serde(rename = "type")]
kind: Kind,
/// The date on which the task was enqueued.
#[serde(serialize_with = "time::serde::rfc3339::serialize")]
#[serde(
serialize_with = "time::serde::rfc3339::serialize",
deserialize_with = "time::serde::rfc3339::deserialize"
)]
enqueued_at: OffsetDateTime,
}

View File

@@ -8,12 +8,13 @@ use index_scheduler::IndexScheduler;
use itertools::{EitherOrBoth, Itertools};
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::deserr_codes::{
InvalidNetworkRemotes, InvalidNetworkSearchApiKey, InvalidNetworkSelf, InvalidNetworkUrl,
InvalidNetworkRemotes, InvalidNetworkSearchApiKey, InvalidNetworkSelf, InvalidNetworkSharding,
InvalidNetworkUrl, InvalidNetworkWriteApiKey,
};
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{Network as DbNetwork, Remote as DbRemote};
use meilisearch_types::keys::actions;
use meilisearch_types::milli::update::Setting;
use meilisearch_types::network::{Network as DbNetwork, Remote as DbRemote};
use serde::Serialize;
use tracing::debug;
use utoipa::{OpenApi, ToSchema};
@@ -51,15 +52,15 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
get,
path = "",
tag = "Network",
security(("Bearer" = ["network.get", "*"])),
security(("Bearer" = ["network.get", "network.*", "*"])),
responses(
(status = OK, description = "Known nodes are returned", body = Network, content_type = "application/json", example = json!(
{
"self": "ms-0",
"remotes": {
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
}
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
@@ -88,9 +89,9 @@ async fn get_network(
#[schema(rename_all = "camelCase")]
pub struct Remote {
#[schema(value_type = Option<String>, example = json!({
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
}))]
#[deserr(default, error = DeserrJsonError<InvalidNetworkUrl>)]
#[serde(default)]
@@ -99,6 +100,10 @@ pub struct Remote {
#[deserr(default, error = DeserrJsonError<InvalidNetworkSearchApiKey>)]
#[serde(default)]
pub search_api_key: Setting<String>,
#[schema(value_type = Option<String>, example = json!("XWnBI8QHUc-4IlqbKPLUDuhftNq19mQtjc6JvmivzJU"))]
#[deserr(default, error = DeserrJsonError<InvalidNetworkWriteApiKey>)]
#[serde(default)]
pub write_api_key: Setting<String>,
}
#[derive(Debug, Deserr, ToSchema, Serialize)]
@@ -114,6 +119,10 @@ pub struct Network {
#[serde(default, rename = "self")]
#[deserr(default, rename = "self", error = DeserrJsonError<InvalidNetworkSelf>)]
pub local: Setting<String>,
#[schema(value_type = Option<bool>, example = json!(true))]
#[serde(default)]
#[deserr(default, error = DeserrJsonError<InvalidNetworkSharding>)]
pub sharding: Setting<bool>,
}
impl Remote {
@@ -136,6 +145,7 @@ impl Remote {
Ok(url)
})?,
search_api_key: self.search_api_key.set(),
write_api_key: self.write_api_key.set(),
})
}
}
@@ -168,15 +178,15 @@ impl Aggregate for PatchNetworkAnalytics {
path = "",
tag = "Network",
request_body = Network,
security(("Bearer" = ["network.update", "*"])),
security(("Bearer" = ["network.update", "network.*", "*"])),
responses(
(status = OK, description = "New network state is returned", body = Network, content_type = "application/json", example = json!(
{
"self": "ms-0",
"remotes": {
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()) },
"ms-0": Remote { url: Setting::Set("http://localhost:7700".into()), search_api_key: Setting::Reset, write_api_key: Setting::Reset },
"ms-1": Remote { url: Setting::Set("http://localhost:7701".into()), search_api_key: Setting::Set("foo".into()), write_api_key: Setting::Set("bar".into()) },
"ms-2": Remote { url: Setting::Set("http://localhost:7702".into()), search_api_key: Setting::Set("bar".into()), write_api_key: Setting::Set("foo".into()) },
}
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
@@ -207,6 +217,19 @@ async fn patch_network(
Setting::NotSet => old_network.local,
};
let merged_sharding = match new_network.sharding {
Setting::Set(new_sharding) => new_sharding,
Setting::Reset => false,
Setting::NotSet => old_network.sharding,
};
if merged_sharding && merged_self.is_none() {
return Err(ResponseError::from_msg(
"`.sharding`: enabling the sharding requires `.self` to be set\n - Hint: Disable `sharding` or set `self` to a value.".into(),
meilisearch_types::error::Code::InvalidNetworkSharding,
));
}
let merged_remotes = match new_network.remotes {
Setting::Set(new_remotes) => {
let mut merged_remotes = BTreeMap::new();
@@ -217,9 +240,17 @@ async fn patch_network(
{
match either_or_both {
EitherOrBoth::Both((key, old), (_, Some(new))) => {
let DbRemote { url: old_url, search_api_key: old_search_api_key } = old;
let DbRemote {
url: old_url,
search_api_key: old_search_api_key,
write_api_key: old_write_api_key,
} = old;
let Remote { url: new_url, search_api_key: new_search_api_key } = new;
let Remote {
url: new_url,
search_api_key: new_search_api_key,
write_api_key: new_write_api_key,
} = new;
let merged = DbRemote {
url: match new_url {
@@ -247,6 +278,11 @@ async fn patch_network(
Setting::Reset => None,
Setting::NotSet => old_search_api_key,
},
write_api_key: match new_write_api_key {
Setting::Set(new_write_api_key) => Some(new_write_api_key),
Setting::Reset => None,
Setting::NotSet => old_write_api_key,
},
};
merged_remotes.insert(key, merged);
}
@@ -274,7 +310,8 @@ async fn patch_network(
&req,
);
let merged_network = DbNetwork { local: merged_self, remotes: merged_remotes };
let merged_network =
DbNetwork { local: merged_self, remotes: merged_remotes, sharding: merged_sharding };
index_scheduler.put_network(merged_network.clone())?;
debug!(returns = ?merged_network, "Patch network");
Ok(HttpResponse::Ok().json(merged_network))

View File

@@ -1,474 +0,0 @@
use std::collections::BTreeMap;
use std::str::FromStr;
use actix_http::header::{
HeaderName, HeaderValue, InvalidHeaderName as ActixInvalidHeaderName,
InvalidHeaderValue as ActixInvalidHeaderValue,
};
use actix_web::web::{self, Data, Path};
use actix_web::{HttpRequest, HttpResponse};
use core::convert::Infallible;
use deserr::actix_web::AwebJson;
use deserr::{DeserializeError, Deserr, ValuePointerRef};
use index_scheduler::IndexScheduler;
use meilisearch_types::deserr::{immutable_field_error, DeserrJsonError};
use meilisearch_types::error::deserr_codes::{
BadRequest, InvalidWebhookHeaders, InvalidWebhookUrl,
};
use meilisearch_types::error::{Code, ErrorCode, ResponseError};
use meilisearch_types::keys::actions;
use meilisearch_types::milli::update::Setting;
use meilisearch_types::webhooks::Webhook;
use serde::Serialize;
use tracing::debug;
use url::Url;
use utoipa::{OpenApi, ToSchema};
use uuid::Uuid;
use crate::analytics::{Aggregate, Analytics};
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use WebhooksError::*;
#[derive(OpenApi)]
#[openapi(
paths(get_webhooks, get_webhook, post_webhook, patch_webhook, delete_webhook),
tags((
name = "Webhooks",
description = "The `/webhooks` route allows you to register endpoints to be called once tasks are processed.",
external_docs(url = "https://www.meilisearch.com/docs/reference/api/webhooks"),
)),
)]
pub struct WebhooksApi;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(get_webhooks))
.route(web::post().to(SeqHandler(post_webhook))),
)
.service(
web::resource("/{uuid}")
.route(web::get().to(get_webhook))
.route(web::patch().to(SeqHandler(patch_webhook)))
.route(web::delete().to(SeqHandler(delete_webhook))),
);
}
#[derive(Debug, Deserr, ToSchema)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields = deny_immutable_fields_webhook)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub(super) struct WebhookSettings {
#[schema(value_type = Option<String>, example = "https://your.site/on-tasks-completed")]
#[deserr(default, error = DeserrJsonError<InvalidWebhookUrl>)]
#[serde(default)]
url: Setting<String>,
#[schema(value_type = Option<BTreeMap<String, String>>, example = json!({"Authorization":"Bearer a-secret-token"}))]
#[deserr(default, error = DeserrJsonError<InvalidWebhookHeaders>)]
#[serde(default)]
headers: Setting<BTreeMap<String, Setting<String>>>,
}
fn deny_immutable_fields_webhook(
field: &str,
accepted: &[&str],
location: ValuePointerRef,
) -> DeserrJsonError {
match field {
"uuid" => immutable_field_error(field, accepted, Code::ImmutableWebhookUuid),
"isEditable" => immutable_field_error(field, accepted, Code::ImmutableWebhookIsEditable),
_ => deserr::take_cf_content(DeserrJsonError::<BadRequest>::error::<Infallible>(
None,
deserr::ErrorKind::UnknownKey { key: field, accepted },
location,
)),
}
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub(super) struct WebhookWithMetadata {
uuid: Uuid,
is_editable: bool,
#[schema(value_type = WebhookSettings)]
#[serde(flatten)]
webhook: Webhook,
}
impl WebhookWithMetadata {
pub fn from(uuid: Uuid, webhook: Webhook) -> Self {
Self { uuid, is_editable: uuid != Uuid::nil(), webhook }
}
}
#[derive(Debug, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub(super) struct WebhookResults {
results: Vec<WebhookWithMetadata>,
}
#[utoipa::path(
get,
path = "",
tag = "Webhooks",
security(("Bearer" = ["webhooks.get", "webhooks.*", "*.get", "*"])),
responses(
(status = OK, description = "Webhooks are returned", body = WebhookResults, content_type = "application/json", example = json!({
"results": [
{
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://your.site/on-tasks-completed",
"headers": {
"Authorization": "Bearer a-secret-token"
},
"isEditable": true
},
{
"uuid": "550e8400-e29b-41d4-a716-446655440001",
"url": "https://another.site/on-tasks-completed",
"isEditable": true
}
]
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!(
{
"message": "The Authorization header is missing. It must use the bearer authorization method.",
"code": "missing_authorization_header",
"type": "auth",
"link": "https://docs.meilisearch.com/errors#missing_authorization_header"
}
)),
)
)]
async fn get_webhooks(
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_GET }>, Data<IndexScheduler>>,
) -> Result<HttpResponse, ResponseError> {
let webhooks = index_scheduler.webhooks_view();
let results = webhooks
.webhooks
.into_iter()
.map(|(uuid, webhook)| WebhookWithMetadata::from(uuid, webhook))
.collect::<Vec<_>>();
let results = WebhookResults { results };
debug!(returns = ?results, "Get webhooks");
Ok(HttpResponse::Ok().json(results))
}
#[derive(Serialize, Default)]
pub struct PatchWebhooksAnalytics;
impl Aggregate for PatchWebhooksAnalytics {
fn event_name(&self) -> &'static str {
"Webhooks Updated"
}
fn aggregate(self: Box<Self>, _new: Box<Self>) -> Box<Self> {
self
}
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
#[derive(Serialize, Default)]
pub struct PostWebhooksAnalytics;
impl Aggregate for PostWebhooksAnalytics {
fn event_name(&self) -> &'static str {
"Webhooks Created"
}
fn aggregate(self: Box<Self>, _new: Box<Self>) -> Box<Self> {
self
}
fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::to_value(*self).unwrap_or_default()
}
}
#[derive(Debug, thiserror::Error)]
enum WebhooksError {
#[error("The URL for the webhook `{0}` is missing.")]
MissingUrl(Uuid),
#[error("Defining too many webhooks would crush the server. Please limit the number of webhooks to 20. You may use a third-party proxy server to dispatch events to more than 20 endpoints.")]
TooManyWebhooks,
#[error("Too many headers for the webhook `{0}`. Please limit the number of headers to 200. Hint: To remove an already defined header set its value to `null`")]
TooManyHeaders(Uuid),
#[error("Webhook `{0}` is immutable. The webhook defined from the command line cannot be modified using the API.")]
ImmutableWebhook(Uuid),
#[error("Webhook `{0}` not found.")]
WebhookNotFound(Uuid),
#[error("Invalid header name `{0}`: {1}")]
InvalidHeaderName(String, ActixInvalidHeaderName),
#[error("Invalid header value `{0}`: {1}")]
InvalidHeaderValue(String, ActixInvalidHeaderValue),
#[error("Invalid URL `{0}`: {1}")]
InvalidUrl(String, url::ParseError),
#[error("Invalid UUID: {0}")]
InvalidUuid(uuid::Error),
}
impl ErrorCode for WebhooksError {
fn error_code(&self) -> meilisearch_types::error::Code {
match self {
MissingUrl(_) => meilisearch_types::error::Code::InvalidWebhookUrl,
TooManyWebhooks => meilisearch_types::error::Code::InvalidWebhooks,
TooManyHeaders(_) => meilisearch_types::error::Code::InvalidWebhookHeaders,
ImmutableWebhook(_) => meilisearch_types::error::Code::ImmutableWebhook,
WebhookNotFound(_) => meilisearch_types::error::Code::WebhookNotFound,
InvalidHeaderName(_, _) => meilisearch_types::error::Code::InvalidWebhookHeaders,
InvalidHeaderValue(_, _) => meilisearch_types::error::Code::InvalidWebhookHeaders,
InvalidUrl(_, _) => meilisearch_types::error::Code::InvalidWebhookUrl,
InvalidUuid(_) => meilisearch_types::error::Code::InvalidWebhookUuid,
}
}
}
fn patch_webhook_inner(
uuid: &Uuid,
old_webhook: Webhook,
new_webhook: WebhookSettings,
) -> Result<Webhook, WebhooksError> {
let Webhook { url: old_url, mut headers } = old_webhook;
let url = match new_webhook.url {
Setting::Set(url) => url,
Setting::NotSet => old_url,
Setting::Reset => return Err(MissingUrl(uuid.to_owned())),
};
match new_webhook.headers {
Setting::Set(new_headers) => {
for (name, value) in new_headers {
match value {
Setting::Set(value) => {
headers.insert(name, value);
}
Setting::NotSet => continue,
Setting::Reset => {
headers.remove(&name);
continue;
}
}
}
}
Setting::Reset => headers.clear(),
Setting::NotSet => (),
};
if headers.len() > 200 {
return Err(TooManyHeaders(uuid.to_owned()));
}
Ok(Webhook { url, headers })
}
fn check_changed(uuid: Uuid, webhook: &Webhook) -> Result<(), WebhooksError> {
if uuid.is_nil() {
return Err(ImmutableWebhook(uuid));
}
if webhook.url.is_empty() {
return Err(MissingUrl(uuid));
}
if webhook.headers.len() > 200 {
return Err(TooManyHeaders(uuid));
}
for (header, value) in &webhook.headers {
HeaderName::from_bytes(header.as_bytes())
.map_err(|e| InvalidHeaderName(header.to_owned(), e))?;
HeaderValue::from_str(value).map_err(|e| InvalidHeaderValue(header.to_owned(), e))?;
}
if let Err(e) = Url::parse(&webhook.url) {
return Err(InvalidUrl(webhook.url.to_owned(), e));
}
Ok(())
}
#[utoipa::path(
get,
path = "/{uuid}",
tag = "Webhooks",
security(("Bearer" = ["webhooks.get", "webhooks.*", "*.get", "*"])),
responses(
(status = 200, description = "Webhook found", body = WebhookWithMetadata, content_type = "application/json", example = json!({
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://your.site/on-tasks-completed",
"headers": {
"Authorization": "Bearer a-secret"
},
"isEditable": true
})),
(status = 404, description = "Webhook not found", body = ResponseError, content_type = "application/json"),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json"),
),
params(
("uuid" = Uuid, Path, description = "The universally unique identifier of the webhook")
)
)]
async fn get_webhook(
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_GET }>, Data<IndexScheduler>>,
uuid: Path<String>,
) -> Result<HttpResponse, ResponseError> {
let uuid = Uuid::from_str(&uuid.into_inner()).map_err(InvalidUuid)?;
let mut webhooks = index_scheduler.webhooks_view();
let webhook = webhooks.webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
let webhook = WebhookWithMetadata::from(uuid, webhook);
debug!(returns = ?webhook, "Get webhook");
Ok(HttpResponse::Ok().json(webhook))
}
#[utoipa::path(
post,
path = "",
tag = "Webhooks",
request_body = WebhookSettings,
security(("Bearer" = ["webhooks.create", "webhooks.*", "*"])),
responses(
(status = 201, description = "Webhook created successfully", body = WebhookWithMetadata, content_type = "application/json", example = json!({
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://your.site/on-tasks-completed",
"headers": {
"Authorization": "Bearer a-secret-token"
},
"isEditable": true
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json"),
(status = 400, description = "Bad request", body = ResponseError, content_type = "application/json"),
)
)]
async fn post_webhook(
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_CREATE }>, Data<IndexScheduler>>,
webhook_settings: AwebJson<WebhookSettings, DeserrJsonError>,
req: HttpRequest,
analytics: Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let webhook_settings = webhook_settings.into_inner();
debug!(parameters = ?webhook_settings, "Post webhook");
let uuid = Uuid::new_v4();
if webhook_settings.headers.as_ref().set().is_some_and(|h| h.len() > 200) {
return Err(TooManyHeaders(uuid).into());
}
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
if webhooks.len() >= 20 {
return Err(TooManyWebhooks.into());
}
let webhook = Webhook {
url: webhook_settings.url.set().ok_or(MissingUrl(uuid))?,
headers: webhook_settings
.headers
.set()
.map(|h| h.into_iter().map(|(k, v)| (k, v.set().unwrap_or_default())).collect())
.unwrap_or_default(),
};
check_changed(uuid, &webhook)?;
webhooks.insert(uuid, webhook.clone());
index_scheduler.update_runtime_webhooks(webhooks)?;
analytics.publish(PostWebhooksAnalytics, &req);
let response = WebhookWithMetadata::from(uuid, webhook);
debug!(returns = ?response, "Post webhook");
Ok(HttpResponse::Created().json(response))
}
#[utoipa::path(
patch,
path = "/{uuid}",
tag = "Webhooks",
request_body = WebhookSettings,
security(("Bearer" = ["webhooks.update", "webhooks.*", "*"])),
responses(
(status = 200, description = "Webhook updated successfully", body = WebhookWithMetadata, content_type = "application/json", example = json!({
"uuid": "550e8400-e29b-41d4-a716-446655440000",
"url": "https://your.site/on-tasks-completed",
"headers": {
"Authorization": "Bearer a-secret-token"
},
"isEditable": true
})),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json"),
(status = 400, description = "Bad request", body = ResponseError, content_type = "application/json"),
),
params(
("uuid" = Uuid, Path, description = "The universally unique identifier of the webhook")
)
)]
async fn patch_webhook(
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_UPDATE }>, Data<IndexScheduler>>,
uuid: Path<String>,
webhook_settings: AwebJson<WebhookSettings, DeserrJsonError>,
req: HttpRequest,
analytics: Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let uuid = Uuid::from_str(&uuid.into_inner()).map_err(InvalidUuid)?;
let webhook_settings = webhook_settings.into_inner();
debug!(parameters = ?(uuid, &webhook_settings), "Patch webhook");
if uuid.is_nil() {
return Err(ImmutableWebhook(uuid).into());
}
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
let old_webhook = webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
let webhook = patch_webhook_inner(&uuid, old_webhook, webhook_settings)?;
check_changed(uuid, &webhook)?;
webhooks.insert(uuid, webhook.clone());
index_scheduler.update_runtime_webhooks(webhooks)?;
analytics.publish(PatchWebhooksAnalytics, &req);
let response = WebhookWithMetadata::from(uuid, webhook);
debug!(returns = ?response, "Patch webhook");
Ok(HttpResponse::Ok().json(response))
}
#[utoipa::path(
delete,
path = "/{uuid}",
tag = "Webhooks",
security(("Bearer" = ["webhooks.delete", "webhooks.*", "*"])),
responses(
(status = 204, description = "Webhook deleted successfully"),
(status = 404, description = "Webhook not found", body = ResponseError, content_type = "application/json"),
(status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json"),
),
params(
("uuid" = Uuid, Path, description = "The universally unique identifier of the webhook")
)
)]
async fn delete_webhook(
index_scheduler: GuardedData<ActionPolicy<{ actions::WEBHOOKS_DELETE }>, Data<IndexScheduler>>,
uuid: Path<String>,
) -> Result<HttpResponse, ResponseError> {
let uuid = Uuid::from_str(&uuid.into_inner()).map_err(InvalidUuid)?;
debug!(parameters = ?uuid, "Delete webhook");
if uuid.is_nil() {
return Err(ImmutableWebhook(uuid).into());
}
let mut webhooks = index_scheduler.retrieve_runtime_webhooks();
webhooks.remove(&uuid).ok_or(WebhookNotFound(uuid))?;
index_scheduler.update_runtime_webhooks(webhooks)?;
debug!(returns = "No Content", "Delete webhook");
Ok(HttpResponse::NoContent().finish())
}

View File

@@ -10,10 +10,10 @@ use actix_http::StatusCode;
use index_scheduler::{IndexScheduler, RoFeatures};
use itertools::Itertools;
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{Network, Remote};
use meilisearch_types::milli::order_by_map::OrderByMap;
use meilisearch_types::milli::score_details::{ScoreDetails, WeightedScoreValue};
use meilisearch_types::milli::{self, DocumentId, OrderBy, TimeBudget, DEFAULT_VALUES_PER_FACET};
use meilisearch_types::network::{Network, Remote};
use roaring::RoaringBitmap;
use tokio::task::JoinHandle;

View File

@@ -1,6 +1,6 @@
pub use error::ProxySearchError;
use error::ReqwestErrorWithoutUrl;
use meilisearch_types::features::Remote;
use meilisearch_types::network::Remote;
use rand::Rng as _;
use reqwest::{Client, Response, StatusCode};
use serde::de::DeserializeOwned;

View File

@@ -421,7 +421,7 @@ async fn error_add_api_key_invalid_parameters_actions() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response, { ".createdAt" => "[ignored]", ".updatedAt" => "[ignored]" }), @r#"
{
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`",
"message": "Unknown value `doc.add` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@@ -304,7 +304,7 @@ async fn access_authorized_stats_restricted_index() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on `products` index only.
let content = json!({
@@ -344,7 +344,7 @@ async fn access_authorized_stats_no_index_restriction() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on all indexes.
let content = json!({
@@ -384,7 +384,7 @@ async fn list_authorized_indexes_restricted_index() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on `products` index only.
let content = json!({
@@ -425,7 +425,7 @@ async fn list_authorized_indexes_no_index_restriction() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on all indexes.
let content = json!({
@@ -507,10 +507,10 @@ async fn access_authorized_index_patterns() {
server.use_api_key(MASTER_KEY);
// refer to products_1 with a modified api key.
// refer to products_1 with modified api key.
let index_1 = server.index("products_1");
server.wait_task(task_id).await;
index_1.wait_task(task_id).await;
let (response, code) = index_1.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -578,19 +578,19 @@ async fn raise_error_non_authorized_index_patterns() {
assert_eq!(202, code, "{:?}", &response);
let task2_id = response["taskUid"].as_u64().unwrap();
// Adding a document to test index. Should Fail with 403 -- invalid_api_key
// Adding document to test index. Should Fail with 403 -- invalid_api_key
let (response, code) = test_index.add_documents(documents, None).await;
assert_eq!(403, code, "{:?}", &response);
server.use_api_key(MASTER_KEY);
// refer to products_1 with a modified api key.
// refer to products_1 with modified api key.
let product_1_index = server.index("products_1");
// refer to products_2 with a modified api key.
// let product_2_index = server.index("products_2");
// refer to products_2 with modified api key.
let product_2_index = server.index("products_2");
server.wait_task(task1_id).await;
server.wait_task(task2_id).await;
product_1_index.wait_task(task1_id).await;
product_2_index.wait_task(task2_id).await;
let (response, code) = product_1_index.get_task(task1_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -603,7 +603,7 @@ async fn raise_error_non_authorized_index_patterns() {
#[actix_rt::test]
async fn pattern_indexes() {
// Create a server with master key
// Create server with master key
let mut server = Server::new_auth().await;
server.use_admin_key(MASTER_KEY).await;
@@ -650,7 +650,7 @@ async fn list_authorized_tasks_restricted_index() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on `products` index only.
let content = json!({
@@ -690,7 +690,7 @@ async fn list_authorized_tasks_no_index_restriction() {
let (response, code) = index.create(Some("product_id")).await;
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
// create key with access on all indexes.
let content = json!({
@@ -757,7 +757,7 @@ async fn error_creating_index_without_action() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
let response = server.wait_task(task_id).await;
let response = index.wait_task(task_id).await;
assert_eq!(response["status"], "failed");
assert_eq!(response["error"], expected_error.clone());
@@ -768,7 +768,7 @@ async fn error_creating_index_without_action() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
let response = server.wait_task(task_id).await;
let response = index.wait_task(task_id).await;
assert_eq!(response["status"], "failed");
assert_eq!(response["error"], expected_error.clone());
@@ -778,7 +778,7 @@ async fn error_creating_index_without_action() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
let response = server.wait_task(task_id).await;
let response = index.wait_task(task_id).await;
assert_eq!(response["status"], "failed");
assert_eq!(response["error"], expected_error.clone());
@@ -830,7 +830,7 @@ async fn lazy_create_index() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -844,7 +844,7 @@ async fn lazy_create_index() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -856,7 +856,7 @@ async fn lazy_create_index() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -911,7 +911,7 @@ async fn lazy_create_index_from_pattern() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -929,7 +929,7 @@ async fn lazy_create_index_from_pattern() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);
@@ -949,7 +949,7 @@ async fn lazy_create_index_from_pattern() {
assert_eq!(202, code, "{:?}", &response);
let task_id = response["taskUid"].as_u64().unwrap();
server.wait_task(task_id).await;
index.wait_task(task_id).await;
let (response, code) = index.get_task(task_id).await;
assert_eq!(200, code, "{:?}", &response);

View File

@@ -93,7 +93,7 @@ async fn create_api_key_bad_actions() {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
{
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`, `webhooks.get`, `webhooks.update`, `webhooks.delete`, `webhooks.create`, `webhooks.*`",
"message": "Unknown value `doggo` at `.actions[0]`: expected one of `*`, `search`, `documents.*`, `documents.add`, `documents.get`, `documents.delete`, `indexes.*`, `indexes.create`, `indexes.get`, `indexes.update`, `indexes.delete`, `indexes.swap`, `tasks.*`, `tasks.cancel`, `tasks.delete`, `tasks.get`, `settings.*`, `settings.get`, `settings.update`, `stats.*`, `stats.get`, `metrics.*`, `metrics.get`, `dumps.*`, `dumps.create`, `snapshots.*`, `snapshots.create`, `version`, `keys.create`, `keys.get`, `keys.update`, `keys.delete`, `experimental.get`, `experimental.update`, `export`, `network.get`, `network.update`, `chatCompletions`, `chats.*`, `chats.get`, `chats.delete`, `chatsSettings.*`, `chatsSettings.get`, `chatsSettings.update`, `*.get`",
"code": "invalid_api_key_actions",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_api_key_actions"

View File

@@ -100,11 +100,11 @@ macro_rules! compute_authorized_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (task1,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task1.uid()).await.succeeded();
index.wait_task(task1.uid()).await.succeeded();
let (task2,_status_code) = index
.update_settings(json!({"filterableAttributes": ["color"]}))
.await;
server.wait_task(task2.uid()).await.succeeded();
index.wait_task(task2.uid()).await.succeeded();
drop(index);
for key_content in ACCEPTED_KEYS.iter() {
@@ -147,7 +147,7 @@ macro_rules! compute_forbidden_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (task, _status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
for key_content in $parent_keys.iter() {

View File

@@ -268,21 +268,21 @@ macro_rules! compute_authorized_single_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (add_task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(add_task.uid()).await.succeeded();
index.wait_task(add_task.uid()).await.succeeded();
let (update_task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["color"]}))
.await;
server.wait_task(update_task.uid()).await.succeeded();
index.wait_task(update_task.uid()).await.succeeded();
drop(index);
let index = server.index("products");
let documents = NESTED_DOCUMENTS.clone();
let (add_task2,_status_code) = index.add_documents(documents, None).await;
server.wait_task(add_task2.uid()).await.succeeded();
index.wait_task(add_task2.uid()).await.succeeded();
let (update_task2,_status_code) = index
.update_settings(json!({"filterableAttributes": ["doggos"]}))
.await;
server.wait_task(update_task2.uid()).await.succeeded();
index.wait_task(update_task2.uid()).await.succeeded();
drop(index);
@@ -339,21 +339,21 @@ macro_rules! compute_authorized_multiple_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["color"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
let index = server.index("products");
let documents = NESTED_DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["doggos"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
@@ -423,21 +423,21 @@ macro_rules! compute_forbidden_single_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["color"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
let index = server.index("products");
let documents = NESTED_DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["doggos"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
assert_eq!($parent_keys.len(), $failed_query_indexes.len(), "keys != query_indexes");
@@ -499,21 +499,21 @@ macro_rules! compute_forbidden_multiple_search {
let index = server.index("sales");
let documents = DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["color"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
let index = server.index("products");
let documents = NESTED_DOCUMENTS.clone();
let (task,_status_code) = index.add_documents(documents, None).await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
let (task,_status_code) = index
.update_settings(json!({"filterableAttributes": ["doggos"]}))
.await;
server.wait_task(task.uid()).await.succeeded();
index.wait_task(task.uid()).await.succeeded();
drop(index);
assert_eq!($parent_keys.len(), $failed_query_indexes.len(), "keys != query_indexes");

File diff suppressed because it is too large Load Diff

View File

@@ -1,13 +1,15 @@
use std::fmt::Write;
use std::marker::PhantomData;
use std::panic::{catch_unwind, resume_unwind, UnwindSafe};
use std::time::Duration;
use actix_web::http::StatusCode;
use tokio::time::sleep;
use urlencoding::encode as urlencode;
use super::encoder::Encoder;
use super::service::Service;
use super::{Owned, Server, Shared, Value};
use super::{Owned, Shared, Value};
use crate::json;
pub struct Index<'a, State = Owned> {
@@ -31,7 +33,7 @@ impl<'a> Index<'a, Owned> {
Index { uid: self.uid.clone(), service: self.service, encoder, marker: PhantomData }
}
pub async fn load_test_set<State>(&self, waiter: &Server<State>) -> u64 {
pub async fn load_test_set(&self) -> u64 {
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
let (response, code) = self
.service
@@ -42,12 +44,12 @@ impl<'a> Index<'a, Owned> {
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_u64().unwrap();
waiter.wait_task(update_id).await;
update_id
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
update_id as u64
}
pub async fn load_test_set_ndjson<State>(&self, waiter: &Server<State>) -> u64 {
pub async fn load_test_set_ndjson(&self) -> u64 {
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
let (response, code) = self
.service
@@ -58,9 +60,9 @@ impl<'a> Index<'a, Owned> {
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_u64().unwrap();
waiter.wait_task(update_id).await;
update_id
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
update_id as u64
}
pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
@@ -249,11 +251,6 @@ impl<'a> Index<'a, Owned> {
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_chat(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/chat", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, settings, self.encoder).await
}
pub async fn delete_settings(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings", urlencode(self.uid.as_ref()));
self.service.delete(url).await
@@ -270,14 +267,10 @@ impl Index<'_, Shared> {
/// You cannot modify the content of a shared index, thus the delete_document_by_filter call
/// must fail. If the task successfully enqueue itself, we'll wait for the task to finishes,
/// and if it succeed the function will panic.
pub async fn delete_document_by_filter_fail<State>(
&self,
body: Value,
waiter: &Server<State>,
) -> (Value, StatusCode) {
pub async fn delete_document_by_filter_fail(&self, body: Value) -> (Value, StatusCode) {
let (mut task, code) = self._delete_document_by_filter(body).await;
if code.is_success() {
task = waiter.wait_task(task.uid()).await;
task = self.wait_task(task.uid()).await;
if task.is_success() {
panic!(
"`delete_document_by_filter_fail` succeeded: {}",
@@ -288,10 +281,10 @@ impl Index<'_, Shared> {
(task, code)
}
pub async fn delete_index_fail<State>(&self, waiter: &Server<State>) -> (Value, StatusCode) {
pub async fn delete_index_fail(&self) -> (Value, StatusCode) {
let (mut task, code) = self._delete().await;
if code.is_success() {
task = waiter.wait_task(task.uid()).await;
task = self.wait_task(task.uid()).await;
if task.is_success() {
panic!(
"`delete_index_fail` succeeded: {}",
@@ -302,14 +295,10 @@ impl Index<'_, Shared> {
(task, code)
}
pub async fn update_index_fail<State>(
&self,
primary_key: Option<&str>,
waiter: &Server<State>,
) -> (Value, StatusCode) {
pub async fn update_index_fail(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
let (mut task, code) = self._update(primary_key).await;
if code.is_success() {
task = waiter.wait_task(task.uid()).await;
task = self.wait_task(task.uid()).await;
if task.is_success() {
panic!(
"`update_index_fail` succeeded: {}",
@@ -375,6 +364,23 @@ impl<State> Index<'_, State> {
self.service.delete(url).await
}
pub async fn wait_task(&self, update_id: u64) -> Value {
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{}", update_id);
for _ in 0..100 {
let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response);
if response["status"] == "succeeded" || response["status"] == "failed" {
return response;
}
// wait 0.5 second.
sleep(Duration::from_millis(500)).await;
}
panic!("Timeout waiting for update id");
}
pub async fn get_task(&self, update_id: u64) -> (Value, StatusCode) {
let url = format!("/tasks/{}", update_id);
self.service.get(url).await

View File

@@ -3,8 +3,10 @@ pub mod index;
pub mod server;
pub mod service;
use std::collections::BTreeMap;
use std::fmt::{self, Display};
use std::{
collections::BTreeMap,
fmt::{self, Display},
};
use actix_http::StatusCode;
#[allow(unused)]
@@ -15,8 +17,10 @@ use serde::{Deserialize, Serialize};
#[allow(unused)]
pub use server::{default_settings, Server};
use tokio::sync::OnceCell;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
use wiremock::{
matchers::{method, path},
Mock, MockServer, Request, ResponseTemplate,
};
use crate::common::index::Index;
@@ -42,15 +46,6 @@ impl Value {
self["uid"].as_u64().is_some() || self["taskUid"].as_u64().is_some()
}
#[track_caller]
pub fn batch_uid(&self) -> u32 {
if let Some(batch_uid) = self["batchUid"].as_u64() {
batch_uid as u32
} else {
panic!("Didn't find `batchUid` in: {self}");
}
}
/// Return `true` if the `status` field is set to `succeeded`.
/// Panic if the `status` field doesn't exists.
#[track_caller]
@@ -194,7 +189,7 @@ pub async fn shared_empty_index() -> &'static Index<'static, Shared> {
let server = Server::new_shared();
let index = server._index("EMPTY_INDEX").to_shared();
let (response, _code) = index._create(None).await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
})
.await
@@ -242,13 +237,13 @@ pub async fn shared_index_with_documents() -> &'static Index<'static, Shared> {
let index = server._index("SHARED_DOCUMENTS").to_shared();
let documents = DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["id", "title"], "sortableAttributes": ["id", "title"]}),
)
.await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
}).await
}
@@ -285,13 +280,13 @@ pub async fn shared_index_with_score_documents() -> &'static Index<'static, Shar
let index = server._index("SHARED_SCORE_DOCUMENTS").to_shared();
let documents = SCORE_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["id", "title"], "sortableAttributes": ["id", "title"]}),
)
.await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
}).await
}
@@ -362,13 +357,13 @@ pub async fn shared_index_with_nested_documents() -> &'static Index<'static, Sha
let index = server._index("SHARED_NESTED_DOCUMENTS").to_shared();
let documents = NESTED_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["father", "doggos", "cattos"], "sortableAttributes": ["doggos"]}),
)
.await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
}).await
}
@@ -462,7 +457,7 @@ pub async fn shared_index_with_test_set() -> &'static Index<'static, Shared> {
)
.await;
assert_eq!(code, 202);
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
})
.await
@@ -509,14 +504,14 @@ pub async fn shared_index_with_geo_documents() -> &'static Index<'static, Shared
let server = Server::new_shared();
let index = server._index("SHARED_GEO_DOCUMENTS").to_shared();
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None).await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["_geo"], "sortableAttributes": ["_geo"]}),
)
.await;
server.wait_task(response.uid()).await.succeeded();
index.wait_task(response.uid()).await.succeeded();
index
})
.await
@@ -614,7 +609,7 @@ pub async fn init_fragments_index() -> (Server<Owned>, String, crate::common::Va
let (value, code) = index.add_documents(documents, None).await;
assert_eq!(code, StatusCode::ACCEPTED);
let _task = server.wait_task(value.uid()).await.succeeded();
let _task = index.wait_task(value.uid()).await.succeeded();
let uid = index.uid.clone();
(server, uid, settings)
@@ -679,7 +674,7 @@ pub async fn init_fragments_index_composite() -> (Server<Owned>, String, crate::
let (value, code) = index.add_documents(documents, None).await;
assert_eq!(code, StatusCode::ACCEPTED);
server.wait_task(value.uid()).await.succeeded();
index.wait_task(value.uid()).await.succeeded();
let uid = index.uid.clone();
(server, uid, settings)

View File

@@ -182,25 +182,6 @@ impl Server<Owned> {
self.service.patch("/network", value).await
}
pub async fn create_webhook(&self, value: Value) -> (Value, StatusCode) {
self.service.post("/webhooks", value).await
}
pub async fn get_webhook(&self, uuid: impl AsRef<str>) -> (Value, StatusCode) {
let url = format!("/webhooks/{}", uuid.as_ref());
self.service.get(url).await
}
pub async fn delete_webhook(&self, uuid: impl AsRef<str>) -> (Value, StatusCode) {
let url = format!("/webhooks/{}", uuid.as_ref());
self.service.delete(url).await
}
pub async fn patch_webhook(&self, uuid: impl AsRef<str>, value: Value) -> (Value, StatusCode) {
let url = format!("/webhooks/{}", uuid.as_ref());
self.service.patch(url, value).await
}
pub async fn get_metrics(&self) -> (Value, StatusCode) {
self.service.get("/metrics").await
}
@@ -428,12 +409,12 @@ impl<State> Server<State> {
pub async fn wait_task(&self, update_id: u64) -> Value {
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{update_id}");
let max_attempts = 400; // 200 seconds in total, 0.5secs per attempt
let url = format!("/tasks/{}", update_id);
let max_attempts = 400; // 200 seconds total, 0.5s per attempt
for i in 0..max_attempts {
let (response, status_code) = self.service.get(url.clone()).await;
assert_eq!(200, status_code, "response: {response}");
let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response);
if response["status"] == "succeeded" || response["status"] == "failed" {
return response;
@@ -466,10 +447,6 @@ impl<State> Server<State> {
pub async fn get_network(&self) -> (Value, StatusCode) {
self.service.get("/network").await
}
pub async fn get_webhooks(&self) -> (Value, StatusCode) {
self.service.get("/webhooks").await
}
}
pub fn default_settings(dir: impl AsRef<Path>) -> Opt {

View File

@@ -1318,7 +1318,7 @@ async fn add_no_documents() {
async fn add_larger_dataset() {
let server = Server::new_shared();
let index = server.unique_index();
let update_id = index.load_test_set(server).await;
let update_id = index.load_test_set().await;
let (response, code) = index.get_task(update_id).await;
assert_eq!(code, 200);
assert_eq!(response["status"], "succeeded");
@@ -1333,7 +1333,7 @@ async fn add_larger_dataset() {
// x-ndjson add large test
let index = server.unique_index();
let update_id = index.load_test_set_ndjson(server).await;
let update_id = index.load_test_set_ndjson().await;
let (response, code) = index.get_task(update_id).await;
assert_eq!(code, 200);
assert_eq!(response["status"], "succeeded");

View File

@@ -7,8 +7,7 @@ use crate::json;
async fn delete_one_document_unexisting_index() {
let server = Server::new_shared();
let index = shared_does_not_exists_index().await;
let (task, code) =
index.delete_document_by_filter_fail(json!({"filter": "a = b"}), server).await;
let (task, code) = index.delete_document_by_filter_fail(json!({"filter": "a = b"})).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.failed();

View File

@@ -559,7 +559,7 @@ async fn delete_document_by_filter() {
let index = shared_does_not_exists_index().await;
// index does not exists
let (response, _code) =
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"}), server).await;
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"})).await;
snapshot!(response, @r###"
{
"uid": "[uid]",
@@ -589,7 +589,7 @@ async fn delete_document_by_filter() {
// no filterable are set
let index = shared_empty_index().await;
let (response, _code) =
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"}), server).await;
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"})).await;
snapshot!(response, @r###"
{
"uid": "[uid]",
@@ -619,7 +619,7 @@ async fn delete_document_by_filter() {
// not filterable while there is a filterable attribute
let index = shared_index_with_documents().await;
let (response, code) =
index.delete_document_by_filter_fail(json!({ "filter": "catto = jorts"}), server).await;
index.delete_document_by_filter_fail(json!({ "filter": "catto = jorts"})).await;
snapshot!(code, @"202 Accepted");
let response = server.wait_task(response.uid()).await.failed();
snapshot!(response, @r###"

View File

@@ -87,7 +87,7 @@ async fn get_document() {
async fn get_document_sorted() {
let server = Server::new_shared();
let index = server.unique_index();
index.load_test_set(server).await;
index.load_test_set().await;
let (task, _status_code) =
index.update_settings_sortable_attributes(json!(["age", "email", "gender", "name"])).await;
@@ -639,7 +639,7 @@ async fn get_document_s_nested_attributes_to_retrieve() {
async fn get_documents_displayed_attributes_is_ignored() {
let server = Server::new_shared();
let index = server.unique_index();
index.load_test_set(server).await;
index.load_test_set().await;
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;

View File

@@ -2366,7 +2366,7 @@ async fn generate_and_import_dump_containing_vectors() {
))
.await;
snapshot!(code, @"202 Accepted");
let response = server.wait_task(response.uid()).await;
let response = index.wait_task(response.uid()).await;
snapshot!(response);
let (response, code) = index
.add_documents(
@@ -2381,12 +2381,12 @@ async fn generate_and_import_dump_containing_vectors() {
)
.await;
snapshot!(code, @"202 Accepted");
let response = server.wait_task(response.uid()).await;
let response = index.wait_task(response.uid()).await;
snapshot!(response);
let (response, code) = server.create_dump().await;
snapshot!(code, @"202 Accepted");
let response = server.wait_task(response.uid()).await;
let response = index.wait_task(response.uid()).await;
snapshot!(response["status"], @r###""succeeded""###);
// ========= We made a dump, now we should clear the DB and try to import our dump

View File

@@ -161,9 +161,9 @@ async fn test_create_multiple_indexes() {
let (task2, _) = index2.create(None).await;
let (task3, _) = index3.create(None).await;
server.wait_task(task1.uid()).await.succeeded();
server.wait_task(task2.uid()).await.succeeded();
server.wait_task(task3.uid()).await.succeeded();
index1.wait_task(task1.uid()).await.succeeded();
index2.wait_task(task2.uid()).await.succeeded();
index3.wait_task(task3.uid()).await.succeeded();
assert_eq!(index1.get().await.1, 200);
assert_eq!(index2.get().await.1, 200);

View File

@@ -26,7 +26,7 @@ async fn create_and_delete_index() {
async fn error_delete_unexisting_index() {
let server = Server::new_shared();
let index = shared_does_not_exists_index().await;
let (task, code) = index.delete_index_fail(server).await;
let (task, code) = index.delete_index_fail().await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await.failed();

View File

@@ -60,8 +60,8 @@ async fn list_multiple_indexes() {
let index_with_key = server.unique_index();
let (response_with_key, _status_code) = index_with_key.create(Some("key")).await;
server.wait_task(response_without_key.uid()).await.succeeded();
server.wait_task(response_with_key.uid()).await.succeeded();
index_without_key.wait_task(response_without_key.uid()).await.succeeded();
index_with_key.wait_task(response_with_key.uid()).await.succeeded();
let (response, code) = server.list_indexes(None, Some(1000)).await;
assert_eq!(code, 200);
@@ -81,9 +81,8 @@ async fn get_and_paginate_indexes() {
let server = Server::new().await;
const NB_INDEXES: usize = 50;
for i in 0..NB_INDEXES {
let (task, code) = server.index(format!("test_{i:02}")).create(None).await;
assert_eq!(code, 202);
server.wait_task(task.uid()).await;
server.index(format!("test_{i:02}")).create(None).await;
server.index(format!("test_{i:02}")).wait_task(i as u64).await;
}
// basic

View File

@@ -72,7 +72,7 @@ async fn error_update_existing_primary_key() {
let server = Server::new_shared();
let index = shared_index_with_documents().await;
let (update_task, code) = index.update_index_fail(Some("primary"), server).await;
let (update_task, code) = index.update_index_fail(Some("primary")).await;
assert_eq!(code, 202);
let response = server.wait_task(update_task.uid()).await.failed();
@@ -91,7 +91,7 @@ async fn error_update_existing_primary_key() {
async fn error_update_unexisting_index() {
let server = Server::new_shared();
let index = shared_does_not_exists_index().await;
let (task, code) = index.update_index_fail(Some("my-primary-key"), server).await;
let (task, code) = index.update_index_fail(Some("my-primary-key")).await;
assert_eq!(code, 202);

View File

@@ -46,7 +46,7 @@ async fn errors_on_param() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown field `selfie`: expected one of `remotes`, `self`",
"message": "Unknown field `selfie`: expected one of `remotes`, `self`, `sharding`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
@@ -149,7 +149,7 @@ async fn errors_on_param() {
meili_snap::snapshot!(code, @"400 Bad Request");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"message": "Unknown field `doggo` inside `.remotes.new`: expected one of `url`, `searchApiKey`",
"message": "Unknown field `doggo` inside `.remotes.new`: expected one of `url`, `searchApiKey`, `writeApiKey`",
"code": "invalid_network_remotes",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#invalid_network_remotes"
@@ -192,9 +192,11 @@ async fn errors_on_param() {
"remotes": {
"kefir": {
"url": "http://localhost:7700",
"searchApiKey": null
"searchApiKey": null,
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
let (response, code) = server
@@ -266,7 +268,8 @@ async fn auth() {
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "master",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -274,11 +277,12 @@ async fn auth() {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "master",
"remotes": {}
}
"###);
{
"self": "master",
"remotes": {},
"sharding": false
}
"###);
// try get with get permission
server.use_api_key(get_network_key.as_str().unwrap());
@@ -286,11 +290,12 @@ async fn auth() {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "master",
"remotes": {}
}
"###);
{
"self": "master",
"remotes": {},
"sharding": false
}
"###);
// try update with update permission
server.use_api_key(update_network_key.as_str().unwrap());
@@ -303,11 +308,12 @@ async fn auth() {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "api_key",
"remotes": {}
}
"###);
{
"self": "api_key",
"remotes": {},
"sharding": false
}
"###);
// try with the other's permission
let (response, code) = server.get_network().await;
@@ -383,7 +389,8 @@ async fn get_and_set_network() {
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": null,
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -393,7 +400,8 @@ async fn get_and_set_network() {
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "myself",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -417,13 +425,16 @@ async fn get_and_set_network() {
"remotes": {
"myself": {
"url": "http://localhost:7700",
"searchApiKey": null
"searchApiKey": null,
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "foo"
"searchApiKey": "foo",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -443,13 +454,16 @@ async fn get_and_set_network() {
"remotes": {
"myself": {
"url": "http://localhost:7700",
"searchApiKey": null
"searchApiKey": null,
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
"searchApiKey": "bar",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -470,17 +484,21 @@ async fn get_and_set_network() {
"remotes": {
"myself": {
"url": "http://localhost:7700",
"searchApiKey": null
"searchApiKey": null,
"writeApiKey": null
},
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
"searchApiKey": "bar",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -498,13 +516,16 @@ async fn get_and_set_network() {
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
"searchApiKey": "bar",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -518,13 +539,16 @@ async fn get_and_set_network() {
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
"searchApiKey": "bar",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -538,13 +562,16 @@ async fn get_and_set_network() {
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
"searchApiKey": "bar",
"writeApiKey": null
}
}
},
"sharding": false
}
"###);
@@ -553,60 +580,69 @@ async fn get_and_set_network() {
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
}
}
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar",
"writeApiKey": null
}
"###);
},
"sharding": false
}
"###);
// still doing nothing
let (response, code) = server.set_network(json!({"remotes": {}})).await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
}
}
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar",
"writeApiKey": null
}
"###);
},
"sharding": false
}
"###);
// good time to check GET
let (response, code) = server.get_network().await;
meili_snap::snapshot!(code, @"200 OK");
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz"
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar"
}
}
{
"self": "thy",
"remotes": {
"them": {
"url": "http://localhost:7702",
"searchApiKey": "baz",
"writeApiKey": null
},
"thy": {
"url": "http://localhost:7701",
"searchApiKey": "bar",
"writeApiKey": null
}
"###);
},
"sharding": false
}
"###);
// deleting everything
let (response, code) = server
@@ -619,7 +655,8 @@ async fn get_and_set_network() {
meili_snap::snapshot!(meili_snap::json_string!(response), @r###"
{
"self": "thy",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
}

View File

@@ -1270,27 +1270,27 @@ async fn search_with_contains_without_enabling_the_feature() {
index
.search(json!({ "filter": "doggo CONTAINS kefir" }), |response, code| {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
snapshot!(json_string!(response), @r###"
{
"message": "Using `CONTAINS` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"message": "Using `CONTAINS` or `STARTS WITH` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"#);
"###);
})
.await;
index
.search(json!({ "filter": "doggo != echo AND doggo CONTAINS kefir" }), |response, code| {
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
snapshot!(json_string!(response), @r###"
{
"message": "Using `CONTAINS` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n25:33 doggo != echo AND doggo CONTAINS kefir",
"message": "Using `CONTAINS` or `STARTS WITH` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n25:33 doggo != echo AND doggo CONTAINS kefir",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"#);
"###);
})
.await;
@@ -1299,24 +1299,24 @@ async fn search_with_contains_without_enabling_the_feature() {
index.search_post(json!({ "filter": ["doggo != echo", "doggo CONTAINS kefir"] })).await;
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
snapshot!(json_string!(response), @r###"
{
"message": "Using `CONTAINS` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"message": "Using `CONTAINS` or `STARTS WITH` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"#);
"###);
let (response, code) =
index.search_post(json!({ "filter": ["doggo != echo", ["doggo CONTAINS kefir"]] })).await;
snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r#"
snapshot!(json_string!(response), @r###"
{
"message": "Using `CONTAINS` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"message": "Using `CONTAINS` or `STARTS WITH` in a filter requires enabling the `contains filter` experimental feature. See https://github.com/orgs/meilisearch/discussions/763\n7:15 doggo CONTAINS kefir",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"#);
"###);
}

View File

@@ -131,7 +131,8 @@ async fn remote_sharding() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -139,7 +140,8 @@ async fn remote_sharding() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms2.set_network(json!({"self": "ms2"})).await;
@@ -147,7 +149,8 @@ async fn remote_sharding() {
snapshot!(json_string!(response), @r###"
{
"self": "ms2",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -158,11 +161,11 @@ async fn remote_sharding() {
let index1 = ms1.index("test");
let index2 = ms2.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index2.add_documents(json!(documents[3..5]), None).await;
ms2.wait_task(task.uid()).await.succeeded();
index2.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -436,7 +439,8 @@ async fn error_unregistered_remote() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -444,7 +448,8 @@ async fn error_unregistered_remote() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -454,9 +459,9 @@ async fn error_unregistered_remote() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -554,7 +559,8 @@ async fn error_no_weighted_score() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -562,7 +568,8 @@ async fn error_no_weighted_score() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -572,9 +579,9 @@ async fn error_no_weighted_score() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -687,7 +694,8 @@ async fn error_bad_response() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -695,7 +703,8 @@ async fn error_bad_response() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -705,9 +714,9 @@ async fn error_bad_response() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -824,7 +833,8 @@ async fn error_bad_request() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -832,7 +842,8 @@ async fn error_bad_request() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -842,9 +853,9 @@ async fn error_bad_request() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -954,7 +965,8 @@ async fn error_bad_request_facets_by_index() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -962,7 +974,8 @@ async fn error_bad_request_facets_by_index() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -972,10 +985,10 @@ async fn error_bad_request_facets_by_index() {
let index0 = ms0.index("test0");
let index1 = ms1.index("test1");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -1095,7 +1108,8 @@ async fn error_bad_request_facets_by_index_facet() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1103,7 +1117,8 @@ async fn error_bad_request_facets_by_index_facet() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1113,13 +1128,13 @@ async fn error_bad_request_facets_by_index_facet() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index0.update_settings_filterable_attributes(json!(["id"])).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -1224,7 +1239,6 @@ async fn error_bad_request_facets_by_index_facet() {
}
#[actix_rt::test]
#[ignore]
async fn error_remote_does_not_answer() {
let ms0 = Server::new().await;
let ms1 = Server::new().await;
@@ -1245,7 +1259,8 @@ async fn error_remote_does_not_answer() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1253,7 +1268,8 @@ async fn error_remote_does_not_answer() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1263,9 +1279,9 @@ async fn error_remote_does_not_answer() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -1446,7 +1462,8 @@ async fn error_remote_404() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1454,7 +1471,8 @@ async fn error_remote_404() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1464,9 +1482,9 @@ async fn error_remote_404() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -1641,7 +1659,8 @@ async fn error_remote_sharding_auth() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1649,7 +1668,8 @@ async fn error_remote_sharding_auth() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1659,9 +1679,9 @@ async fn error_remote_sharding_auth() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
ms1.clear_api_key();
@@ -1801,7 +1821,8 @@ async fn remote_sharding_auth() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1809,7 +1830,8 @@ async fn remote_sharding_auth() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1819,9 +1841,9 @@ async fn remote_sharding_auth() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
ms1.clear_api_key();
@@ -1956,7 +1978,8 @@ async fn error_remote_500() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -1964,7 +1987,8 @@ async fn error_remote_500() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -1974,9 +1998,9 @@ async fn error_remote_500() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -2135,7 +2159,8 @@ async fn error_remote_500_once() {
snapshot!(json_string!(response), @r###"
{
"self": "ms0",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
let (response, code) = ms1.set_network(json!({"self": "ms1"})).await;
@@ -2143,7 +2168,8 @@ async fn error_remote_500_once() {
snapshot!(json_string!(response), @r###"
{
"self": "ms1",
"remotes": {}
"remotes": {},
"sharding": false
}
"###);
@@ -2153,9 +2179,9 @@ async fn error_remote_500_once() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);
@@ -2336,9 +2362,9 @@ async fn error_remote_timeout() {
let index0 = ms0.index("test");
let index1 = ms1.index("test");
let (task, _status_code) = index0.add_documents(json!(documents[0..2]), None).await;
ms0.wait_task(task.uid()).await.succeeded();
index0.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index1.add_documents(json!(documents[2..3]), None).await;
ms1.wait_task(task.uid()).await.succeeded();
index1.wait_task(task.uid()).await.succeeded();
// wrap servers
let ms0 = Arc::new(ms0);

View File

@@ -1,66 +0,0 @@
use crate::common::Server;
use crate::json;
use meili_snap::{json_string, snapshot};
#[actix_rt::test]
async fn set_reset_chat_issue_5772() {
let server = Server::new().await;
let index = server.unique_index();
let (_, code) = server
.set_features(json!({
"chatCompletions": true,
}))
.await;
snapshot!(code, @r#"200 OK"#);
let (task1, _code) = index.update_settings_chat(json!({
"description": "test!",
"documentTemplate": "{% for field in fields %}{% if field.is_searchable and field.value != nil %}{{ field.name }}: {{ field.value }}\n{% endif %}{% endfor %}",
"documentTemplateMaxBytes": 400,
"searchParameters": {
"limit": 15,
"sort": [],
"attributesToSearchOn": []
}
})).await;
server.wait_task(task1.uid()).await.succeeded();
let (response, _) = index.settings().await;
snapshot!(json_string!(response["chat"]), @r#"
{
"description": "test!",
"documentTemplate": "{% for field in fields %}{% if field.is_searchable and field.value != nil %}{{ field.name }}: {{ field.value }}\n{% endif %}{% endfor %}",
"documentTemplateMaxBytes": 400,
"searchParameters": {
"limit": 15,
"sort": [],
"attributesToSearchOn": []
}
}
"#);
let (task2, _status_code) = index.update_settings_chat(json!({
"description": "test!",
"documentTemplate": "{% for field in fields %}{% if field.is_searchable and field.value != nil %}{{ field.name }}: {{ field.value }}\n{% endif %}{% endfor %}",
"documentTemplateMaxBytes": 400,
"searchParameters": {
"limit": 16
}
})).await;
server.wait_task(task2.uid()).await.succeeded();
let (response, _) = index.settings().await;
snapshot!(json_string!(response["chat"]), @r#"
{
"description": "test!",
"documentTemplate": "{% for field in fields %}{% if field.is_searchable and field.value != nil %}{{ field.name }}: {{ field.value }}\n{% endif %}{% endfor %}",
"documentTemplateMaxBytes": 400,
"searchParameters": {
"limit": 16,
"sort": [],
"attributesToSearchOn": []
}
}
"#);
}

Some files were not shown because too many files have changed in this diff Show More