Compare commits

..

28 Commits

Author SHA1 Message Date
ManyTheFish
f779548d48 Update test 2023-03-13 17:26:08 +01:00
ManyTheFish
492bff4b21 Deactivate Chinese tokenization 2023-03-13 13:54:37 +01:00
bors[bot]
fb1260ee88 Merge #3568 #3569
3568: CI: Fix `publish-aarch64` job that still uses ubuntu-18.04 r=Kerollmops a=curquiza

Fixes #3563 

Main change
- add the usage of the `ubuntu-18.04` container instead of the native `ubuntu-18.04` of GitHub actions: I had to install docker in the container.

Small additional changes
- remove useless `fail-fast` and unused/irrelevant matrix inputs (`build`, `linker`, `os`, `use-cross`...)
- Remove useless step in job

Proof of work with this CI triggered on this current branch: https://github.com/meilisearch/meilisearch/actions/runs/4366233882

3569: Enhance Japanese language detection r=dureuill a=ManyTheFish

# Pull Request

This PR is a prototype and can be tested by downloading [the dedicated docker image](https://hub.docker.com/layers/getmeili/meilisearch/prototype-better-language-detection-0/images/sha256-a12847de00e21a71ab797879fd09777dadcb0881f65b5f810e7d1ed434d116ef?context=explore):

```bash
$ docker pull getmeili/meilisearch:prototype-better-language-detection-0
```

## Context
Some Languages are harder to detect than others, this miss-detection leads to bad tokenization making some words or even documents completely unsearchable. Japanese is the main Language affected and can be detected as Chinese which has a completely different way of tokenization.

A [first iteration has been implemented for v1.1.0](https://github.com/meilisearch/meilisearch/pull/3347) but is an insufficient enhancement to make Japanese work. This first implementation was detecting the Language during the indexing to avoid bad detections during the search.
Unfortunately, some documents (shorter ones) can be wrongly detected as Chinese running bad tokenization for these documents and making possible the detection of Chinese during the search because it has been detected during the indexing.

For instance, a Japanese document `{"id": 1, "name": "東京スカパラダイスオーケストラ"}` is detected as Japanese during indexing, during the search the query `東京` will be detected as Japanese because only Japanese documents have been detected during indexing despite the fact that v1.0.2 would detect it as Chinese.
However if in the dataset there is at least one document containing a field with only Kanjis like:
_A document with only 1 field containing only Kanjis:_
```json
{
 "id":4,
 "name": "東京特許許可局"
}
```
_A document with 1 field containing only Kanjis and 1 field containing several Japanese characters:_
```json
{
 "id":105,
 "name": "東京特許許可局",
 "desc": "日経平均株価は26日 に約8カ月ぶりに2万4000円の心理的な節目を上回った。株高を支える材料のひとつは、自民党総裁選で3選を決めた安倍晋三首相の経済政策への期待だ。恩恵が見込まれるとされる人材サービスや建設株の一角が買われている。ただ思惑が先行して資金が集まっている面 は否めない。実際に政策効果を取り込む企業はどこか、なお未知数だ。"
}
```

Then, in both cases, the field `name` will be detected as Chinese during indexing allowing the search to detect Chinese in queries. Therefore,  the query `東京` will be detected as Chinese and only the two last documents will be retrieved by Meilisearch.

## Technical Approach

The current PR partially fixes these issues by:
1) Adding a check over potential miss-detections and rerunning the extraction of the document forcing the tokenization over the main Languages detected in it.
 >  1) run a first extraction allowing the tokenizer to detect any Language in any Script
 >  2) generate a distribution of tokens by Script and Languages (`script_language`)
 >  3) if for a Script we have a token distribution of one of the Language that is under the threshold, then we rerun the extraction forbidding the tokenizer to detect the marginal Languages
 >  4) the tokenizer will fall back on the other available Languages to tokenize the text. For example, if the Chinese were marginally detected compared to the Japanese on the CJ script, then the second extraction will force Japanese tokenization for CJ text in the document. however, the text on another script like Latin will not be impacted by this restriction.

2) Adding a filtering threshold during the search over Languages that have been marginally detected in documents

## Limits
This PR introduces 2 arbitrary thresholds:
1) during the indexing, a Language is considered miss-detected if the number of detected tokens of this Language is under 10% of the tokens detected in the same Script (Japanese and Chinese are 2 different Languages sharing the "same" script "CJK").
2) during the search, a Language is considered marginal if less than 5% of documents are detected as this Language.

This PR only partially fixes these issues:
-  the query `東京` now find Japanese documents if less than 5% of documents are detected as Chinese.
-  the document with the id `105` containing the Japanese field `desc` but the miss-detected field `name` is now completely detected and tokenized as Japanese and is found with the query `東京`.
-  the document with the id `4` no longer breaks the search Language detection but continues to be detected as a Chinese document and can't be found during the search.

## Related issue
Fixes #3565

## Possible future enhancements
- Change or contribute to the Library used to detect the Language
  - the related issue on Whatlang: https://github.com/greyblake/whatlang-rs/issues/122

Co-authored-by: curquiza <clementine@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
Co-authored-by: Many the fish <many@meilisearch.com>
2023-03-09 15:34:35 +00:00
bors[bot]
48a51e5cd6 Merge #3577
3577: Avoid fetching an LMDB value with an empty string r=ManyTheFish a=Kerollmops

# Pull Request

## Related issue
Fixes #3574 

## What does this PR do?
This PR fixes a bug where an empty key fetches an entry in the database. LMDB throws an error if an empty or too-long key is used to fetch an entry. This empty string seems to have been generated by the Charabia tokenizer.

Co-authored-by: Clément Renault <clement@meilisearch.com>
2023-03-09 14:35:25 +00:00
ManyTheFish
2f8eb4f54a last PR fixes 2023-03-09 15:34:36 +01:00
Many the fish
dea101e3d9 Update meilisearch/src/routes/indexes/mod.rs
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2023-03-09 15:17:03 +01:00
Clément Renault
175e8a8495 Fix a diacritic issue 2023-03-09 14:57:47 +01:00
Clément Renault
6da54d0cb6 Add a test to fix a diacritic issue 2023-03-09 14:57:38 +01:00
bors[bot]
667bb87e35 Merge #3541
3541: Add cache on the indexes stats r=dureuill a=irevoire

Fix https://github.com/meilisearch/meilisearch/issues/3540

Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2023-03-09 13:32:52 +00:00
ManyTheFish
dff2715ef3 Try removing needless collect 2023-03-09 11:28:10 +01:00
ManyTheFish
5deea631ea fix clippy too many arguments 2023-03-09 11:19:13 +01:00
ManyTheFish
b4b859ec8c Fix typos 2023-03-09 10:58:35 +01:00
curquiza
b99ef3d336 Update CI to still use ubuntu-18 2023-03-08 17:11:36 +01:00
ManyTheFish
7e2fd82e41 Use Language allow list in the highlighter 2023-03-08 12:44:16 +01:00
ManyTheFish
24c0775c67 Change indexing threshold 2023-03-08 12:36:04 +01:00
ManyTheFish
3092cf0448 Fix clippy errors 2023-03-08 10:53:42 +01:00
ManyTheFish
37d4551e8e Add a threshold filtering the Languages allowed to be detected at search time 2023-03-07 19:38:01 +01:00
ManyTheFish
da48506f15 Rerun extraction when language detection might have failed 2023-03-07 18:35:26 +01:00
Louis Dureuil
2f5b9fbbd8 Restore contribution of the index sizes to the db size
- the index size now contributes to the db size even if the index is not authorized
2023-03-07 14:05:27 +01:00
Louis Dureuil
7faa9a22f6 Pass IndexStat by ref in store_stats_of 2023-03-07 14:00:54 +01:00
bors[bot]
370d88f626 Merge #3561
3561: Fix the snapshots permissions on unix system r=irevoire a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/3507

The snapshot permissions were wrong after the v0.30 and the huge refacto of the index scheduler.
Fix this issue + add a test on the permissions on unix

Co-authored-by: Tamo <tamo@meilisearch.com>
2023-03-07 08:51:38 +00:00
Tamo
d34faa8f9c put back the sleep as it was and fix the from 2023-03-06 18:09:09 +01:00
Tamo
e5d0bef6d8 update a comment 2023-03-06 17:04:24 +01:00
Louis Dureuil
76288fad72 Fix snapshots 2023-03-06 16:57:31 +01:00
Louis Dureuil
076a3d371c Eagerly compute stats as fallback to the cache.
- Refactor all around to avoid spawning indexes more times than necessary
2023-03-06 16:57:31 +01:00
Tamo
3bbf760542 update most snapshots 2023-03-06 16:57:31 +01:00
Tamo
fd5c48941a Add cache on the indexes stats 2023-03-06 16:57:31 +01:00
Tamo
e704728ee7 fix the snapshots permissions on unix system 2023-03-06 16:28:40 +01:00
193 changed files with 973 additions and 1925 deletions

View File

@@ -0,0 +1,28 @@
name: Create issue to upgrade dependencies
on:
schedule:
# Run the first of the month, every 3 month
- cron: '0 0 1 */3 *'
workflow_dispatch:
jobs:
create-issue:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Create an issue
uses: actions-ecosystem/action-create-issue@v1
with:
github_token: ${{ secrets.MEILI_BOT_GH_PAT }}
title: Upgrade dependencies
body: |
This issue is about updating Meilisearch dependencies:
- [ ] Cargo toml dependencies of Meilisearch; but also the main engine-team repositories that Meilisearch depends on (charabia, heed...)
- [ ] If new Rust versions have been released, update the Rust version in the Clippy job of this [GitHub Action file](./.github/workflows/rust.yml)
⚠️ To avoid last minute bugs, this issue should only be done at the beginning of the sprint!
The GitHub action dependencies are managed by [Dependabot](./.github/dependabot.yml)
labels: |
dependencies
maintenance

View File

@@ -1,24 +0,0 @@
name: Create issue to upgrade dependencies
on:
schedule:
# Run the first of the month, every 3 month
- cron: '0 0 1 */3 *'
workflow_dispatch:
jobs:
create-issue:
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
GH_TOKEN: ${{ secrets.MEILI_BOT_GH_PAT }}
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/dependency-issue.md > $ISSUE_TEMPLATE
- name: Create issue
run: |
gh issue create \
--title 'Upgrade dependencies' \
--label 'dependencies,maintenance' \
--body-file $ISSUE_TEMPLATE

View File

@@ -1,4 +1,4 @@
name: Benchmarks (manual)
name: Benchmarks
on:
workflow_dispatch:

View File

@@ -1,5 +1,3 @@
name: Publish binaries to GitHub release
on:
workflow_dispatch:
schedule:
@@ -7,6 +5,8 @@ on:
release:
types: [published]
name: Publish binaries to release
jobs:
check-version:
name: Check the version validity
@@ -54,7 +54,7 @@ jobs:
# No need to upload binaries for dry run (cron)
- name: Upload binaries to release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.4.0
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/release/meilisearch
@@ -87,7 +87,7 @@ jobs:
# No need to upload binaries for dry run (cron)
- name: Upload binaries to release
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.4.0
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/release/${{ matrix.artifact_name }}
@@ -96,14 +96,12 @@ jobs:
publish-macos-apple-silicon:
name: Publish binary for macOS silicon
runs-on: ${{ matrix.os }}
runs-on: macos-12
needs: check-version
strategy:
fail-fast: false
matrix:
include:
- os: macos-12
target: aarch64-apple-darwin
- target: aarch64-apple-darwin
asset_name: meilisearch-macos-apple-silicon
steps:
- name: Checkout repository
@@ -123,7 +121,7 @@ jobs:
- name: Upload the binary to release
# No need to upload binaries for dry run (cron)
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.4.0
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/${{ matrix.target }}/release/meilisearch
@@ -132,21 +130,29 @@ jobs:
publish-aarch64:
name: Publish binary for aarch64
runs-on: ${{ matrix.os }}
runs-on: ubuntu-latest
needs: check-version
container:
# Use ubuntu-18.04 to compile with glibc 2.27
image: ubuntu:18.04
strategy:
fail-fast: false
matrix:
include:
- build: aarch64
os: ubuntu-18.04
target: aarch64-unknown-linux-gnu
linker: gcc-aarch64-linux-gnu
use-cross: true
- target: aarch64-unknown-linux-gnu
asset_name: meilisearch-linux-aarch64
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install needed dependencies
run: |
apt-get update -y && apt upgrade -y
apt-get install -y curl build-essential gcc-aarch64-linux-gnu
- name: Set up Docker for cross compilation
run: |
apt-get install -y curl apt-transport-https ca-certificates software-properties-common
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
add-apt-repository "deb [arch=$(dpkg --print-architecture)] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
apt-get update -y && apt-get install -y docker-ce
- name: Installing Rust toolchain
uses: actions-rs/toolchain@v1
with:
@@ -154,15 +160,7 @@ jobs:
profile: minimal
target: ${{ matrix.target }}
override: true
- name: APT update
run: |
sudo apt update
- name: Install target specific tools
if: matrix.use-cross
run: |
sudo apt-get install -y ${{ matrix.linker }}
- name: Configure target aarch64 GNU
if: matrix.target == 'aarch64-unknown-linux-gnu'
## Environment variable is not passed using env:
## LD gold won't work with MUSL
# env:
@@ -176,14 +174,16 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: build
use-cross: ${{ matrix.use-cross }}
use-cross: true
args: --release --target ${{ matrix.target }}
env:
CROSS_DOCKER_IN_DOCKER: true
- name: List target output files
run: ls -lR ./target
- name: Upload the binary to release
# No need to upload binaries for dry run (cron)
if: github.event_name == 'release'
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.4.0
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/${{ matrix.target }}/release/meilisearch

View File

@@ -1,4 +1,4 @@
name: Publish to APT & Homebrew
name: Publish to APT repository & Homebrew
on:
release:
@@ -35,7 +35,7 @@ jobs:
- name: Build deb package
run: cargo deb -p meilisearch -o target/debian/meilisearch.deb
- name: Upload debian pkg to release
uses: svenstaro/upload-release-action@2.5.0
uses: svenstaro/upload-release-action@2.4.0
with:
repo_token: ${{ secrets.MEILI_BOT_GH_PAT }}
file: target/debian/meilisearch.deb

View File

@@ -1,5 +1,4 @@
name: Publish images to Docker Hub
---
on:
push:
# Will run for every tag pushed except `latest`
@@ -13,6 +12,8 @@ on:
- cron: '0 23 * * *' # Every day at 11:00pm
workflow_dispatch:
name: Publish tagged images to Docker Hub
jobs:
docker:
runs-on: docker

View File

@@ -1,4 +1,4 @@
name: Benchmarks of indexing (push)
name: Benchmarks indexing (push)
on:
push:

View File

@@ -1,4 +1,4 @@
name: Benchmarks of search for geo (push)
name: Benchmarks search geo (push)
on:
push:

View File

@@ -1,4 +1,4 @@
name: Benchmarks of search for songs (push)
name: Benchmarks search songs (push)
on:
push:

View File

@@ -1,4 +1,4 @@
name: Benchmarks of search for Wikipedia articles (push)
name: Benchmarks search wikipedia articles (push)
on:
push:

View File

@@ -1,4 +1,4 @@
name: Test suite
name: Rust
on:
workflow_dispatch:
@@ -25,35 +25,36 @@ jobs:
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations
image: ubuntu:18.04
steps:
- uses: actions/checkout@v3
- name: Install needed dependencies
run: |
apt-get update && apt-get install -y curl
apt-get install build-essential -y
- name: Run test with Rust stable
if: github.event_name != 'schedule'
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Run test with Rust nightly
if: github.event_name == 'schedule'
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
command: build
args: --locked --release --no-default-features --all
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --locked --release --all
- uses: actions/checkout@v3
- name: Install needed dependencies
run: |
apt-get update && apt-get install -y curl
apt-get install build-essential -y
- name: Run test with Rust stable
if: github.event_name != 'schedule'
uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Run test with Rust nightly
if: github.event_name == 'schedule'
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
# Disable cache due to disk space issues with Windows workers in CI
# - name: Cache dependencies
# uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
command: build
args: --locked --release --no-default-features --all
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --locked --release --all
test-others:
name: Tests on ${{ matrix.os }}
@@ -63,47 +64,19 @@ jobs:
matrix:
os: [macos-12, windows-2022]
steps:
- uses: actions/checkout@v3
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
command: build
args: --locked --release --no-default-features --all
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --locked --release --all
test-all-features:
name: Tests all features on cron schedule only
runs-on: ubuntu-latest
container:
# Use ubuntu-18.04 to compile with glibc 2.27, which are the production expectations
image: ubuntu:18.04
if: github.event_name == 'schedule'
steps:
- uses: actions/checkout@v3
- name: Install needed dependencies
run: |
apt-get update
apt-get install --assume-yes build-essential curl
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
override: true
- name: Run cargo build with all features
uses: actions-rs/cargo@v1
with:
command: build
args: --workspace --locked --release --all-features
- name: Run cargo test with all features
uses: actions-rs/cargo@v1
with:
command: test
args: --workspace --locked --release --all-features
- uses: actions/checkout@v3
# - name: Cache dependencies
# uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
with:
command: build
args: --locked --release --no-default-features --all
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test
args: --locked --release --all
# We run tests in debug also, to make sure that the debug_assertions are hit
test-debug:
@@ -122,8 +95,8 @@ jobs:
with:
toolchain: stable
override: true
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.0
# - name: Cache dependencies
# uses: Swatinem/rust-cache@v2.2.0
- name: Run tests in debug
uses: actions-rs/cargo@v1
with:
@@ -141,8 +114,8 @@ jobs:
toolchain: 1.67.0
override: true
components: clippy
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.0
# - name: Cache dependencies
# uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo clippy
uses: actions-rs/cargo@v1
with:
@@ -161,8 +134,8 @@ jobs:
toolchain: nightly
override: true
components: rustfmt
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.0
# - name: Cache dependencies
# uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo fmt
# Since we never ran the `build.rs` script in the benchmark directory we are missing one auto-generated import file.
# Since we want to trigger (and fail) this action as fast as possible, instead of building the benchmark crate

View File

@@ -23,7 +23,7 @@ jobs:
target: x86_64-unknown-linux-musl
- name: Cache dependencies
uses: Swatinem/rust-cache@v2.2.1
uses: Swatinem/rust-cache@v2.2.0
- name: Run cargo check without any default features
uses: actions-rs/cargo@v1
@@ -46,14 +46,14 @@ jobs:
- name: Docker metadata
id: meta
uses: docker/metadata-action@v4
uses: docker/metadata-action@v3
with:
images: registry.uffizzi.com/${{ env.UUID_TAG }}
tags: |
type=raw,value=60d
- name: Build Image
uses: docker/build-push-action@v4
uses: docker/build-push-action@v3
with:
context: ./
file: .github/uffizzi/Dockerfile

129
Cargo.lock generated
View File

@@ -252,7 +252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
dependencies = [
"cfg-if",
"cipher 0.3.0",
"cipher",
"cpufeatures",
"opaque-debug",
]
@@ -523,17 +523,6 @@ version = "3.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "572f695136211188308f16ad2ca5c851a712c464060ae6974944458eb83880ba"
[[package]]
name = "bus"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80cb4625f5b60155ff1018c9d4ce2e38bf5ae3e5780dfab9fa68bb44a6b751e2"
dependencies = [
"crossbeam-channel",
"num_cpus",
"parking_lot_core",
]
[[package]]
name = "byte-unit"
version = "4.0.18"
@@ -652,17 +641,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chacha20"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7fc89c7c5b9e7a02dfe45cd2367bae382f9ed31c61ca8debe5f827c420a2f08"
dependencies = [
"cfg-if",
"cipher 0.4.4",
"cpufeatures",
]
[[package]]
name = "change-detection"
version = "1.2.0"
@@ -734,16 +712,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "3.2.23"
@@ -802,24 +770,6 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "cluster"
version = "1.1.0"
dependencies = [
"bus",
"crossbeam",
"ductile",
"log",
"meilisearch-types",
"roaring",
"serde",
"serde_json",
"synchronoise",
"thiserror",
"time",
"uuid 1.3.0",
]
[[package]]
name = "concat-arrays"
version = "0.1.2"
@@ -1198,21 +1148,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "ductile"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12cde25956886749c891a27249630ae99471f1ba05c4a924aad1a6ffe6932812"
dependencies = [
"anyhow",
"bincode",
"chacha20",
"crossbeam-channel",
"log",
"rand",
"serde",
]
[[package]]
name = "dump"
version = "1.1.0"
@@ -1235,14 +1170,14 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
name = "either"
version = "1.8.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
dependencies = [
"serde",
]
@@ -1441,7 +1376,7 @@ dependencies = [
"faux",
"tempfile",
"thiserror",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
@@ -1960,7 +1895,6 @@ dependencies = [
"anyhow",
"big_s",
"bincode",
"cluster",
"crossbeam",
"csv",
"derive_builder",
@@ -1981,7 +1915,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
@@ -1995,15 +1929,6 @@ dependencies = [
"serde",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "insta"
version = "1.26.0"
@@ -2548,7 +2473,6 @@ dependencies = [
"bytes",
"cargo_toml",
"clap 4.0.32",
"cluster",
"crossbeam-channel",
"deserr",
"dump",
@@ -2609,7 +2533,7 @@ dependencies = [
"tokio-stream",
"toml",
"urlencoding",
"uuid 1.3.0",
"uuid 1.2.2",
"vergen",
"walkdir",
"yaup",
@@ -2621,7 +2545,6 @@ name = "meilisearch-auth"
version = "1.1.0"
dependencies = [
"base64 0.13.1",
"cluster",
"enum-iterator",
"hmac",
"maplit",
@@ -2633,7 +2556,7 @@ dependencies = [
"sha2",
"thiserror",
"time",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
@@ -2663,7 +2586,7 @@ dependencies = [
"thiserror",
"time",
"tokio",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
@@ -2738,7 +2661,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"uuid 1.3.0",
"uuid 1.2.2",
]
[[package]]
@@ -3576,9 +3499,9 @@ checksum = "58bc9567378fc7690d6b2addae4e60ac2eeea07becb2c64b9f218b53865cba2a"
[[package]]
name = "serde"
version = "1.0.155"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71f2b4817415c6d4210bfe1c7bfcf4801b2d904cb4d0e1a8fdb651013c9e86b8"
checksum = "bb7d1f0d3021d347a83e556fc4683dea2ea09d87bccdf88ff5c12545d89d5efb"
dependencies = [
"serde_derive",
]
@@ -3594,9 +3517,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.155"
version = "1.0.152"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d071a94a3fac4aff69d023a7f411e33f40f3483f8c5190b1953822b6b76d7630"
checksum = "af487d118eecd09402d70a5d72551860e788df87b464af30e5ea6a38c75c541e"
dependencies = [
"proc-macro2",
"quote",
@@ -3605,9 +3528,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.94"
version = "1.0.91"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c533a59c9d8a93a09c6ab31f0fd5e5f4dd1b8fc9434804029839884765d04ea"
checksum = "877c235533714907a8c2464236f5c4b2a17262ef1bd71f38f35ea592c8da6883"
dependencies = [
"indexmap",
"itoa 1.0.5",
@@ -3893,18 +3816,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d"
[[package]]
name = "thiserror"
version = "1.0.39"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5ab016db510546d856297882807df8da66a16fb8c4101cb8b30054b0d5b2d9c"
checksum = "6a9cd18aa97d5c45c6603caea1da6628790b37f7a34b6ca89522331c5180fed0"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.39"
version = "1.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e"
checksum = "1fb327af4685e4d03fa8cbcf1716380da910eeb2bb8be417e7f9fd3fb164f36f"
dependencies = [
"proc-macro2",
"quote",
@@ -3913,9 +3836,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.20"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890"
checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376"
dependencies = [
"itoa 1.0.5",
"serde",
@@ -3931,9 +3854,9 @@ checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd"
[[package]]
name = "time-macros"
version = "0.2.8"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36"
checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2"
dependencies = [
"time-core",
]
@@ -4178,9 +4101,9 @@ dependencies = [
[[package]]
name = "uuid"
version = "1.3.0"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1674845326ee10d37ca60470760d4288a6f80f304007d92e5c53bab78c9cfd79"
checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [
"getrandom",
"serde",

View File

@@ -9,7 +9,6 @@ members = [
"dump",
"file-store",
"permissive-json-pointer",
"cluster",
"milli",
"filter-parser",
"flatten-serde-json",

View File

@@ -1,25 +0,0 @@
[package]
name = "cluster"
publish = false
version.workspace = true
authors.workspace = true
description.workspace = true
homepage.workspace = true
readme.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
ductile = "0.3.0"
serde = { version = "1.0.155", features = ["derive"] }
serde_json = "1.0.94"
thiserror = "1.0.39"
meilisearch-types = { path = "../meilisearch-types" }
roaring = { version = "0.10.1", features = ["serde"] }
log = "0.4.17"
crossbeam = "0.8.2"
bus = "2.3.0"
time = "0.3.20"
uuid = { version = "1.3.0", features = ["v4"] }
synchronoise = "1.0.1"

View File

@@ -1,148 +0,0 @@
use meilisearch_types::milli::update::IndexDocumentsMethod;
use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::TaskId;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Batch {
TaskCancelation {
/// The task cancelation itself.
task: TaskId,
/// The date and time at which the previously processing tasks started.
previous_started_at: OffsetDateTime,
/// The list of tasks that were processing when this task cancelation appeared.
previous_processing_tasks: RoaringBitmap,
},
TaskDeletion(TaskId),
SnapshotCreation(Vec<TaskId>),
Dump(TaskId),
IndexOperation {
op: IndexOperation,
must_create_index: bool,
},
IndexCreation {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexUpdate {
index_uid: String,
primary_key: Option<String>,
task: TaskId,
},
IndexDeletion {
index_uid: String,
tasks: Vec<TaskId>,
index_has_been_created: bool,
},
IndexSwap {
task: TaskId,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexOperation {
DocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
tasks: Vec<TaskId>,
},
DocumentDeletion {
index_uid: String,
// The vec associated with each document deletion tasks.
documents: Vec<Vec<String>>,
tasks: Vec<TaskId>,
},
DocumentClear {
index_uid: String,
tasks: Vec<TaskId>,
},
Settings {
index_uid: String,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
tasks: Vec<TaskId>,
},
DocumentClearAndSetting {
index_uid: String,
cleared_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
SettingsAndDocumentOperation {
index_uid: String,
primary_key: Option<String>,
method: IndexDocumentsMethod,
documents_counts: Vec<u64>,
operations: Vec<DocumentOperation>,
document_import_tasks: Vec<TaskId>,
// The boolean indicates if it's a settings deletion or creation.
settings: Vec<(bool, Settings<Unchecked>)>,
settings_tasks: Vec<TaskId>,
},
}
impl Batch {
pub fn ids(&self) -> impl Iterator<Item = TaskId> {
type Ret = Box<dyn Iterator<Item = TaskId>>;
match self {
Batch::TaskCancelation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::TaskDeletion(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::SnapshotCreation(tasks) => Box::new(tasks.clone().into_iter()) as Ret,
Batch::Dump(task) => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentDeletion { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClear { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::Settings { tasks, .. } => {
Box::new(tasks.clone().into_iter()) as Ret
}
IndexOperation::DocumentClearAndSetting {
cleared_tasks, settings_tasks, ..
} => {
Box::new(cleared_tasks.clone().into_iter().chain(settings_tasks.clone())) as Ret
}
IndexOperation::SettingsAndDocumentOperation {
document_import_tasks,
settings_tasks,
..
} => Box::new(
document_import_tasks.clone().into_iter().chain(settings_tasks.clone()),
) as Ret,
},
Batch::IndexCreation { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexUpdate { task, .. } => Box::new(std::iter::once(*task)) as Ret,
Batch::IndexDeletion { tasks, .. } => Box::new(tasks.clone().into_iter()) as Ret,
Batch::IndexSwap { task } => Box::new(std::iter::once(*task)) as Ret,
}
}
}

View File

@@ -1,276 +0,0 @@
use std::net::ToSocketAddrs;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{atomic, Arc, Mutex, RwLock};
use std::time::Duration;
use bus::{Bus, BusReader};
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{ChannelReceiver, ChannelSender, ChannelServer};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::Task;
use synchronoise::SignalEvent;
use uuid::Uuid;
use crate::batch::Batch;
use crate::{ApiKeyOperation, Consistency, FollowerMsg, LeaderMsg};
#[derive(Clone)]
pub struct Leader {
task_ready_to_commit: Receiver<u32>,
broadcast_to_follower: Sender<LeaderMsg>,
needs_key_sender: Sender<Sender<Vec<Key>>>,
needs_key_receiver: Receiver<Sender<Vec<Key>>>,
pub wake_up: Arc<SignalEvent>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
batch_id: Arc<RwLock<u32>>,
}
impl Leader {
pub fn new(
listen_on: impl ToSocketAddrs + Send + 'static,
master_key: Option<String>,
) -> Leader {
let new_followers = Arc::new(AtomicUsize::new(0));
let active_followers = Arc::new(AtomicUsize::new(1));
let wake_up = Arc::new(SignalEvent::auto(true));
let (broadcast_to_follower, process_batch_receiver) = unbounded();
let (task_finished_sender, task_finished_receiver) = unbounded();
let (needs_key_sender, needs_key_receiver) = unbounded();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
std::thread::spawn(move || {
Self::listener(
listen_on,
master_key,
nf,
af,
wu,
process_batch_receiver,
task_finished_sender,
)
});
Leader {
task_ready_to_commit: task_finished_receiver,
broadcast_to_follower,
needs_key_sender,
needs_key_receiver,
wake_up,
new_followers,
active_followers,
batch_id: Arc::default(),
}
}
pub fn has_new_followers(&self) -> bool {
self.new_followers.load(Ordering::Relaxed) != 0
}
/// Takes all the necessary channels to chat with the scheduler and give them
/// to each new followers
fn listener(
listen_on: impl ToSocketAddrs,
master_key: Option<String>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
broadcast_to_follower: Receiver<LeaderMsg>,
task_finished: Sender<u32>,
) {
let listener: ChannelServer<LeaderMsg, FollowerMsg> = if let Some(ref master_key) =
master_key
{
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Listening with encryption enabled");
ChannelServer::bind_with_enc(listen_on, enc).unwrap()
} else {
ChannelServer::bind(listen_on).unwrap()
};
info!("Ready to the receive connections");
// We're going to broadcast all the batches to all our follower
let bus: Bus<LeaderMsg> = Bus::new(10);
let bus = Arc::new(Mutex::new(bus));
let b = bus.clone();
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread is dead");
b.lock().unwrap().broadcast(msg);
});
for (sender, receiver, _addr) in listener {
let task_finished = task_finished.clone();
let nf = new_followers.clone();
let af = active_followers.clone();
let wu = wake_up.clone();
let process_batch = bus.lock().unwrap().add_rx();
std::thread::spawn(move || {
Self::follower(sender, receiver, nf, af, wu, process_batch, task_finished)
});
}
}
/// Allow a follower to chat with the scheduler
fn follower(
sender: ChannelSender<LeaderMsg>,
receiver: ChannelReceiver<FollowerMsg>,
new_followers: Arc<AtomicUsize>,
active_followers: Arc<AtomicUsize>,
wake_up: Arc<SignalEvent>,
mut broadcast_to_follower: BusReader<LeaderMsg>,
task_finished: Sender<u32>,
) {
let size = new_followers.fetch_add(1, Ordering::Relaxed) + 1;
wake_up.signal();
info!("A new follower joined the cluster. {} members.", size);
loop {
if let msg @ LeaderMsg::JoinFromDump(_) =
broadcast_to_follower.recv().expect("Main thread died")
{
// we exit the new_follower state and become an active follower even though
// the dump will takes some time to index
new_followers.fetch_sub(1, Ordering::Relaxed);
let size = active_followers.fetch_add(1, Ordering::Relaxed) + 1;
info!("A new follower became active. {} active members.", size);
sender.send(msg).unwrap();
break;
}
}
// send messages to the follower
std::thread::spawn(move || loop {
let msg = broadcast_to_follower.recv().expect("Main thread died");
match msg {
LeaderMsg::JoinFromDump(_) => (),
msg => {
if sender.send(msg).is_err() {
// the follower died, the logging and cluster size update should be done
// in the other thread
break;
}
}
}
});
// receive messages from the follower
loop {
match receiver.recv() {
Err(_) => break,
Ok(msg) => match msg {
FollowerMsg::ReadyToCommit(id) => {
task_finished.send(id).expect("Can't reach the main thread")
}
FollowerMsg::RegisterNewTask(_) => todo!(),
},
}
}
// if we exited from the previous loop it means the follower is down and should
// be removed from the cluster
let size = active_followers.fetch_sub(1, atomic::Ordering::Relaxed) - 1;
info!("A follower left the cluster. {} members.", size);
}
// ============= Everything related to the setup of the cluster
pub fn join_me(&self, dump: Vec<u8>) {
self.broadcast_to_follower
.send(LeaderMsg::JoinFromDump(dump))
.expect("Lost the link with the followers");
}
// ============= Everything related to the scheduler
pub fn starts_batch(&self, batch: Batch) {
let mut batch_id = self.batch_id.write().unwrap();
info!("Send the batch to process to the followers");
*batch_id += 1;
self.broadcast_to_follower
.send(LeaderMsg::StartBatch { id: *batch_id, batch })
.expect("Can't reach the cluster");
}
pub fn commit(&self, consistency_level: Consistency) {
info!("Wait until enough followers are ready to commit a batch");
let batch_id = self.batch_id.write().unwrap();
let mut nodes_ready_to_commit = 1;
loop {
let size = self.active_followers.load(atomic::Ordering::Relaxed);
info!("{nodes_ready_to_commit} nodes are ready to commit for a cluster size of {size}");
let all = nodes_ready_to_commit == size;
match consistency_level {
Consistency::One if nodes_ready_to_commit >= 1 || all => break,
Consistency::Two if nodes_ready_to_commit >= 2 || all => break,
Consistency::Quorum if nodes_ready_to_commit >= (size / 2) || all => break,
Consistency::All if all => break,
_ => (),
}
// we can't wait forever here because if a node dies the cluster size might get updated while we're stuck
match self.task_ready_to_commit.recv_timeout(Duration::new(1, 0)) {
Ok(id) if id == *batch_id => nodes_ready_to_commit += 1,
_ => continue,
};
}
info!("Tells all the follower to commit");
self.broadcast_to_follower.send(LeaderMsg::Commit(*batch_id)).unwrap();
}
pub fn register_new_task(&self, task: Task, update_file: Option<Vec<u8>>) {
info!("Tells all the follower to register a new task");
self.broadcast_to_follower
.send(LeaderMsg::RegisterNewTask { task, update_file })
.expect("Main thread is dead");
}
// ============= Everything related to the api-keys
pub fn insert_key(&self, key: Key) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Insert(key)))
.unwrap()
}
pub fn delete_key(&self, uuid: Uuid) {
self.broadcast_to_follower
.send(LeaderMsg::ApiKeyOperation(ApiKeyOperation::Delete(uuid)))
.unwrap()
}
pub fn needs_keys(&self) -> Sender<Vec<Key>> {
self.needs_key_receiver.recv().expect("The cluster is dead")
}
pub fn get_keys(&self) -> Vec<Key> {
let (send, rcv) = crossbeam::channel::bounded(1);
self.needs_key_sender.send(send).expect("The cluster is dead");
rcv.recv().expect("The auth controller is dead")
}
}

View File

@@ -1,231 +0,0 @@
use std::net::ToSocketAddrs;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use batch::Batch;
use crossbeam::channel::{unbounded, Receiver, Sender};
use ductile::{connect_channel, connect_channel_with_enc, ChannelReceiver, ChannelSender};
use log::{info, warn};
use meilisearch_types::keys::Key;
use meilisearch_types::tasks::{KindWithContent, Task};
use serde::{Deserialize, Serialize};
pub mod batch;
mod leader;
pub use leader::Leader;
use uuid::Uuid;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("Network issue occured")]
NetworkIssue,
#[error("Internal error:{0}")]
SerdeJson(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LeaderMsg {
/// A dump to join the cluster
JoinFromDump(Vec<u8>),
/// Starts a new batch
StartBatch { id: u32, batch: Batch },
///Tell the follower to commit the update asap
Commit(u32),
///Tell the follower to commit the update asap
RegisterNewTask { task: Task, update_file: Option<Vec<u8>> },
///Tell the follower to commit the update asap
ApiKeyOperation(ApiKeyOperation),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum FollowerMsg {
// Let the leader knows you're ready to commit
ReadyToCommit(u32),
RegisterNewTask(KindWithContent),
}
#[derive(Default, Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Consistency {
One,
Two,
Quorum,
#[default]
All,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ApiKeyOperation {
Insert(Key),
Delete(Uuid),
}
impl std::fmt::Display for Consistency {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Consistency::One => write!(f, "one"),
Consistency::Two => write!(f, "two"),
Consistency::Quorum => write!(f, "quorum"),
Consistency::All => write!(f, "all"),
}
}
}
impl FromStr for Consistency {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"one" => Ok(Consistency::One),
"two" => Ok(Consistency::Two),
"quorum" => Ok(Consistency::Quorum),
"all" => Ok(Consistency::All),
s => Err(format!(
"Unexpected value `{s}`, expected one of `one`, `two`, `quorum`, `all`"
)),
}
}
}
#[derive(Clone)]
pub enum Cluster {
Leader(Leader),
Follower(Follower),
}
#[derive(Clone)]
pub struct Follower {
sender: ChannelSender<FollowerMsg>,
get_batch: Receiver<(u32, Batch)>,
must_commit: Receiver<u32>,
register_new_task: Receiver<(Task, Option<Vec<u8>>)>,
api_key_op: Receiver<ApiKeyOperation>,
batch_id: Arc<RwLock<u32>>,
}
impl Follower {
pub fn join(leader: impl ToSocketAddrs, master_key: Option<String>) -> (Follower, Vec<u8>) {
let (sender, receiver) = if let Some(master_key) = master_key {
let mut enc = [0; 32];
let master_key = master_key.as_bytes();
if master_key.len() < 32 {
warn!("Master key is not secure, use a longer master key (at least 32 bytes long)");
}
enc.iter_mut().zip(master_key).for_each(|(enc, mk)| *enc = *mk);
info!("Connecting with encryption enabled");
connect_channel_with_enc(leader, &enc).unwrap()
} else {
connect_channel(leader).unwrap()
};
info!("Connection to the leader established");
info!("Waiting for the leader to contact us");
let state = receiver.recv().unwrap();
let dump = match state {
LeaderMsg::JoinFromDump(dump) => dump,
msg => panic!("Received unexpected message {msg:?}"),
};
let (get_batch_sender, get_batch_receiver) = unbounded();
let (must_commit_sender, must_commit_receiver) = unbounded();
let (register_task_sender, register_task_receiver) = unbounded();
let (create_api_key_sender, create_api_key_receiver) = unbounded();
std::thread::spawn(move || {
Self::router(
receiver,
get_batch_sender,
must_commit_sender,
register_task_sender,
create_api_key_sender,
);
});
(
Follower {
sender,
get_batch: get_batch_receiver,
must_commit: must_commit_receiver,
register_new_task: register_task_receiver,
api_key_op: create_api_key_receiver,
batch_id: Arc::default(),
},
dump,
)
}
fn router(
receiver: ChannelReceiver<LeaderMsg>,
get_batch: Sender<(u32, Batch)>,
must_commit: Sender<u32>,
register_new_task: Sender<(Task, Option<Vec<u8>>)>,
api_key_op: Sender<ApiKeyOperation>,
) {
loop {
match receiver.recv().expect("Lost connection to the leader") {
LeaderMsg::JoinFromDump(_) => {
warn!("Received a join from dump msg but Im already running : ignoring the message")
}
LeaderMsg::StartBatch { id, batch } => {
info!("Starting to process a new batch");
get_batch.send((id, batch)).expect("Lost connection to the main thread")
}
LeaderMsg::Commit(id) => {
info!("Must commit");
must_commit.send(id).expect("Lost connection to the main thread")
}
LeaderMsg::RegisterNewTask { task, update_file } => {
info!("Registered a new task");
register_new_task
.send((task, update_file))
.expect("Lost connection to the main thread")
}
LeaderMsg::ApiKeyOperation(key) => {
api_key_op.send(key).expect("Lost connection to the main thread")
}
}
}
}
pub fn get_new_batch(&self) -> Batch {
info!("Get new batch called");
let (id, batch) = self.get_batch.recv().expect("Lost connection to the leader");
info!("Got a new batch");
*self.batch_id.write().unwrap() = id;
batch
}
pub fn ready_to_commit(&self) {
info!("I'm ready to commit");
let batch_id = self.batch_id.read().unwrap();
self.sender.send(FollowerMsg::ReadyToCommit(*batch_id)).unwrap();
loop {
let id = self.must_commit.recv().expect("Lost connection to the leader");
#[allow(clippy::comparison_chain)]
if id == *batch_id {
break;
} else if id > *batch_id {
panic!("We missed a batch");
}
}
info!("I got the right to commit");
}
pub fn get_new_task(&self) -> (Task, Option<Vec<u8>>) {
self.register_new_task.recv().expect("Lost connection to the leader")
}
pub fn api_key_operation(&self) -> ApiKeyOperation {
info!("Creating a new api key");
self.api_key_op.recv().expect("Lost connection to the leader")
}
}

View File

@@ -13,8 +13,6 @@ license.workspace = true
[dependencies]
anyhow = "1.0.64"
bincode = "1.3.3"
cluster = { path = "../cluster" }
crossbeam = "0.8.2"
csv = "1.1.6"
derive_builder = "0.11.2"
dump = { path = "../dump" }

View File

@@ -22,8 +22,7 @@ use std::ffi::OsStr;
use std::fs::{self, File};
use std::io::BufWriter;
use crossbeam::utils::Backoff;
use dump::{DumpWriter, IndexMetadata};
use dump::IndexMetadata;
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@@ -42,14 +41,14 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
use crate::{Cluster, Error, IndexScheduler, ProcessingTasks, Result, TaskId};
use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum Batch {
TaskCancelation {
/// The task cancelation itself.
@@ -86,14 +85,14 @@ pub(crate) enum Batch {
},
}
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum DocumentOperation {
Add(Uuid),
Delete(Vec<String>),
}
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub(crate) enum IndexOperation {
DocumentOperation {
index_uid: String,
@@ -587,12 +586,6 @@ impl IndexScheduler {
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
// We must only remove the content files if the transaction is successfully committed
// and if errors occurs when we are deleting files we must do our best to delete
// everything. We do not return the encountered errors when deleting the content
@@ -636,13 +629,6 @@ impl IndexScheduler {
}
_ => unreachable!(),
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
Ok(vec![task])
}
@@ -689,9 +675,6 @@ impl IndexScheduler {
}
// 3. Snapshot every indexes
// TODO we are opening all of the indexes it can be too much we should unload all
// of the indexes we are trying to open. It would be even better to only unload
// the ones that were opened by us. Or maybe use a LRU in the index mapper.
for result in self.index_mapper.index_mapping.iter(&rtxn)? {
let (name, uuid) = result?;
let index = self.index_mapper.index(&rtxn, name)?;
@@ -728,6 +711,14 @@ impl IndexScheduler {
// 5.3 Change the permission to make the snapshot readonly
let mut permissions = file.metadata()?.permissions();
permissions.set_readonly(true);
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
#[allow(clippy::non_octal_unix_permissions)]
// rwxrwxrwx
permissions.set_mode(0b100100100);
}
file.set_permissions(permissions)?;
for task in &mut tasks {
@@ -737,9 +728,96 @@ impl IndexScheduler {
Ok(tasks)
}
Batch::Dump(mut task) => {
// TODO: It would be better to use the started_at from the task instead of generating a new one
let started_at = OffsetDateTime::now_utc();
let dump = self.create_dump(&task, &started_at)?;
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) =
cursor.next_document().map_err(milli::Error::from)?
{
dump_content_file.push_document(&obkv_to_object(
&doc,
&documents_batch_index,
)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
let dump_uid = started_at.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
@@ -755,27 +833,38 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid();
let index_uid = op.index_uid().to_string();
let index = if must_create_index {
// create the index if it doesn't already exist
let wtxn = self.env.write_txn()?;
self.index_mapper.create_index(wtxn, index_uid, None)?
self.index_mapper.create_index(wtxn, &index_uid, None)?
} else {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, index_uid)?
self.index_mapper.index(&rtxn, &index_uid)?
};
let mut index_wtxn = index.write_txn()?;
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
index_wtxn.commit()?;
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<()> {
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?;
Ok(())
}();
match res {
Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e),
}
Ok(tasks)
}
Batch::IndexCreation { index_uid, primary_key, task } => {
@@ -806,9 +895,31 @@ impl IndexScheduler {
)?;
index_wtxn.commit()?;
}
// drop rtxn before starting a new wtxn on the same db
rtxn.commit()?;
task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key });
// if the update processed successfully, we're going to store the new
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let res = || -> Result<()> {
let mut wtxn = self.env.write_txn()?;
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
wtxn.commit()?;
Ok(())
}();
match res {
Ok(_) => (),
Err(e) => error!("Could not write the stats of the index {}", e),
}
Ok(vec![task])
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
@@ -872,13 +983,6 @@ impl IndexScheduler {
for swap in swaps {
self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?;
}
match &self.cluster {
Some(Cluster::Leader(leader)) => leader.commit(self.consistency_level),
Some(Cluster::Follower(follower)) => follower.ready_to_commit(),
None => (),
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok(vec![task])
@@ -886,99 +990,6 @@ impl IndexScheduler {
}
}
pub(crate) fn create_dump(
&self,
task: &Task,
started_at: &OffsetDateTime,
) -> Result<DumpWriter> {
let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
(keys, instance_uid)
} else {
unreachable!();
};
let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys
let mut dump_keys = dump.create_keys()?;
for key in keys {
dump_keys.push_key(key)?;
}
dump_keys.flush()?;
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
if t.uid == task.uid {
let finished_at = OffsetDateTime::now_utc();
// We're going to fake the date because we don't know if everything is going to go well.
// But we need to dump the task as finished and successful.
// If something fail everything will be set appropriately in the end.
t.status = Status::Succeeded;
t.started_at = Some(*started_at);
t.finished_at = Some(finished_at);
}
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
dump_content_file.flush()?;
}
}
}
dump_tasks.flush()?;
// 3. Dump the indexes
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.to_owned(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(index, &rtxn)?;
index_dumper.settings(&settings)?;
Ok(())
})?;
Ok(dump)
}
/// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> {
// 1. Verify that both lhs and rhs are existing indexes
@@ -1409,274 +1420,4 @@ impl IndexScheduler {
Ok(content_files_to_delete)
}
pub(crate) fn get_batch_from_cluster_batch(
&self,
batch: cluster::batch::Batch,
) -> Result<Batch> {
use cluster::batch::Batch as CBatch;
let mut rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
for id in batch.ids() {
let backoff = Backoff::new();
let id = BEU32::new(id);
loop {
if self.all_tasks.get(&rtxn, &id)?.is_some() {
info!("Found the task_id");
break;
}
info!("The task is not present in the task queue, we wait");
// we need to drop the txn to make a write visible
drop(rtxn);
backoff.spin();
rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
}
}
Ok(match batch {
CBatch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
Batch::TaskCancelation {
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
previous_started_at,
previous_processing_tasks,
}
}
CBatch::TaskDeletion(task) => {
Batch::TaskDeletion(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::SnapshotCreation(tasks) => {
Batch::SnapshotCreation(self.get_existing_tasks(&rtxn, tasks)?)
}
CBatch::Dump(task) => {
Batch::Dump(self.get_existing_tasks(&rtxn, Some(task))?[0].clone())
}
CBatch::IndexOperation { op, must_create_index } => Batch::IndexOperation {
op: self.get_index_op_from_cluster_index_op(&rtxn, op)?,
must_create_index,
},
CBatch::IndexCreation { index_uid, primary_key, task } => Batch::IndexCreation {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexUpdate { index_uid, primary_key, task } => Batch::IndexUpdate {
index_uid,
primary_key,
task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone(),
},
CBatch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
Batch::IndexDeletion {
index_uid,
tasks: self.get_existing_tasks(&rtxn, tasks)?,
index_has_been_created,
}
}
CBatch::IndexSwap { task } => {
Batch::IndexSwap { task: self.get_existing_tasks(&rtxn, Some(task))?[0].clone() }
}
})
}
pub(crate) fn get_index_op_from_cluster_index_op(
&self,
rtxn: &RoTxn,
op: cluster::batch::IndexOperation,
) -> Result<IndexOperation> {
use cluster::batch::IndexOperation as COp;
Ok(match op {
COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentDeletion { index_uid, documents, tasks } => {
IndexOperation::DocumentDeletion {
index_uid,
documents,
tasks: self.get_existing_tasks(rtxn, tasks)?,
}
}
COp::DocumentClear { index_uid, tasks } => IndexOperation::DocumentClear {
index_uid,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::Settings { index_uid, settings, tasks } => IndexOperation::Settings {
index_uid,
settings,
tasks: self.get_existing_tasks(rtxn, tasks)?,
},
COp::DocumentClearAndSetting { index_uid, cleared_tasks, settings, settings_tasks } => {
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks: self.get_existing_tasks(rtxn, cleared_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
}
}
COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: self.get_existing_tasks(rtxn, document_import_tasks)?,
settings,
settings_tasks: self.get_existing_tasks(rtxn, settings_tasks)?,
},
})
}
}
impl From<Batch> for cluster::batch::Batch {
fn from(batch: Batch) -> Self {
use cluster::batch::Batch as CBatch;
match batch {
Batch::TaskCancelation { task, previous_started_at, previous_processing_tasks } => {
CBatch::TaskCancelation {
task: task.uid,
previous_started_at,
previous_processing_tasks,
}
}
Batch::TaskDeletion(task) => CBatch::TaskDeletion(task.uid),
Batch::SnapshotCreation(task) => {
CBatch::SnapshotCreation(task.into_iter().map(|task| task.uid).collect())
}
Batch::Dump(task) => CBatch::Dump(task.uid),
Batch::IndexOperation { op, must_create_index } => {
CBatch::IndexOperation { op: op.into(), must_create_index }
}
Batch::IndexCreation { index_uid, primary_key, task } => {
CBatch::IndexCreation { index_uid, primary_key, task: task.uid }
}
Batch::IndexUpdate { index_uid, primary_key, task } => {
CBatch::IndexUpdate { index_uid, primary_key, task: task.uid }
}
Batch::IndexDeletion { index_uid, tasks, index_has_been_created } => {
CBatch::IndexDeletion {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
index_has_been_created,
}
}
Batch::IndexSwap { task } => CBatch::IndexSwap { task: task.uid },
}
}
}
impl From<IndexOperation> for cluster::batch::IndexOperation {
fn from(op: IndexOperation) -> Self {
use cluster::batch::IndexOperation as COp;
match op {
IndexOperation::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
tasks,
} => COp::DocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentDeletion { index_uid, documents, tasks } => {
COp::DocumentDeletion {
index_uid,
documents,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
}
}
IndexOperation::DocumentClear { index_uid, tasks } => COp::DocumentClear {
index_uid,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::Settings { index_uid, settings, tasks } => COp::Settings {
index_uid,
settings,
tasks: tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::DocumentClearAndSetting {
index_uid,
cleared_tasks,
settings,
settings_tasks,
} => COp::DocumentClearAndSetting {
index_uid,
cleared_tasks: cleared_tasks.into_iter().map(|task| task.uid).collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
IndexOperation::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations,
document_import_tasks,
settings,
settings_tasks,
} => COp::SettingsAndDocumentOperation {
index_uid,
primary_key,
method,
documents_counts,
operations: operations.into_iter().map(|op| op.into()).collect(),
document_import_tasks: document_import_tasks
.into_iter()
.map(|task| task.uid)
.collect(),
settings,
settings_tasks: settings_tasks.into_iter().map(|task| task.uid).collect(),
},
}
}
}
impl From<DocumentOperation> for cluster::batch::DocumentOperation {
fn from(op: DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
DocumentOperation::Add(uuid) => COp::Add(uuid),
DocumentOperation::Delete(docs) => COp::Delete(docs),
}
}
}
impl From<cluster::batch::DocumentOperation> for DocumentOperation {
fn from(op: cluster::batch::DocumentOperation) -> Self {
use cluster::batch::DocumentOperation as COp;
match op {
COp::Add(uuid) => DocumentOperation::Add(uuid),
COp::Delete(docs) => DocumentOperation::Delete(docs),
}
}
}

View File

@@ -4,10 +4,11 @@ use std::time::Duration;
use std::{fs, thread};
use log::error;
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::Index;
use meilisearch_types::milli::{FieldDistribution, Index};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use uuid::Uuid;
@@ -19,6 +20,7 @@ use crate::{Error, Result};
mod index_map;
const INDEX_MAPPING: &str = "index-mapping";
const INDEX_STATS: &str = "index-stats";
/// Structure managing meilisearch's indexes.
///
@@ -52,6 +54,11 @@ pub struct IndexMapper {
/// Map an index name with an index uuid currently available on disk.
pub(crate) index_mapping: Database<Str, UuidCodec>,
/// Map an index UUID with the cached stats associated to the index.
///
/// Using an UUID forces to use the index_mapping table to recover the index behind a name, ensuring
/// consistency wrt index swapping.
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,
@@ -76,6 +83,39 @@ pub enum IndexStatus {
Available(Index),
}
/// The statistics that can be computed from an `Index` object.
#[derive(Serialize, Deserialize, Debug)]
pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64,
/// Size of the index' DB, in bytes.
pub database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution,
/// Creation date of the index.
pub created_at: OffsetDateTime,
/// Date of the last update of the index.
pub updated_at: OffsetDateTime,
}
impl IndexStats {
/// Compute the stats of an index
///
/// # Parameters
///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> Result<Self> {
let database_size = index.on_disk_size()?;
Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?,
database_size,
field_distribution: index.field_distribution(rtxn)?,
created_at: index.created_at(rtxn)?,
updated_at: index.updated_at(rtxn)?,
})
}
}
impl IndexMapper {
pub fn new(
env: &Env,
@@ -88,6 +128,7 @@ impl IndexMapper {
Ok(Self {
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
index_stats: env.create_database(Some(INDEX_STATS))?,
base_path,
index_base_map_size,
index_growth_amount,
@@ -140,6 +181,9 @@ impl IndexMapper {
.get(&wtxn, name)?
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// Not an error if the index had no stats in cache.
self.index_stats.delete(&mut wtxn, &uuid)?;
// Once we retrieved the UUID of the index we remove it from the mapping table.
assert!(self.index_mapping.delete(&mut wtxn, name)?);
@@ -360,6 +404,45 @@ impl IndexMapper {
Ok(())
}
/// The stats of an index.
///
/// If available in the cache, they are directly returned.
/// Otherwise, the `Index` is opened to compute the stats on the fly (the result is not cached).
/// The stats for an index are cached after each `Index` update.
pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result<IndexStats> {
let uuid = self
.index_mapping
.get(rtxn, index_uid)?
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
match self.index_stats.get(rtxn, &uuid)? {
Some(stats) => Ok(stats),
None => {
let index = self.index(rtxn, index_uid)?;
let index_rtxn = index.read_txn()?;
IndexStats::new(&index, &index_rtxn)
}
}
}
/// Stores the new stats for an index.
///
/// Expected usage is to compute the stats the index using `IndexStats::new`, the pass it to this function.
pub fn store_stats_of(
&self,
wtxn: &mut RwTxn,
index_uid: &str,
stats: &IndexStats,
) -> Result<()> {
let uuid = self
.index_mapping
.get(wtxn, index_uid)?
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
self.index_stats.put(wtxn, &uuid, stats)?;
Ok(())
}
pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result<bool> {
Ok(self.index_mapping.get(rtxn, name)?.is_some())
}

View File

@@ -33,8 +33,6 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _,
auth_path: _,
version_file_path: _,
cluster: _,
consistency_level: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,
@@ -256,6 +254,16 @@ pub fn snapshot_canceled_by(
snap
}
pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let mut s = String::new();
let names = mapper.index_names(rtxn).unwrap();
format!("{names:?}")
for name in names {
let stats = mapper.stats_of(rtxn, &name).unwrap();
s.push_str(&format!(
"{name}: {{ number_of_documents: {}, field_distribution: {:?} }}\n",
stats.number_of_documents, stats.field_distribution
));
}
s
}

View File

@@ -31,7 +31,6 @@ mod uuid_codec;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use std::io::Write;
use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@@ -39,22 +38,17 @@ use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use batch::Batch;
use cluster::{Cluster, Consistency};
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use file_store::FileStore;
use log::info;
use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env, RoTxn};
use meilisearch_types::milli;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
use serde::Deserialize;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use utils::{filter_out_references_to_newer_tasks, keep_tasks_within_datetimes, map_bound};
@@ -307,11 +301,6 @@ pub struct IndexScheduler {
/// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf,
/// The role in the cluster
pub(crate) cluster: Option<Cluster>,
/// The Consistency level used by the leader. Ignored if the node is not in a leader in cluster mode.
pub(crate) consistency_level: Consistency,
// ================= test
// The next entry is dedicated to the tests.
/// Provide a way to set a breakpoint in multiple part of the scheduler.
@@ -331,24 +320,6 @@ pub struct IndexScheduler {
run_loop_iteration: Arc<RwLock<usize>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum ClusterMode {
Leader,
Follower,
}
impl std::str::FromStr for ClusterMode {
type Err = ();
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s {
"leader" => Ok(ClusterMode::Leader),
"follower" => Ok(ClusterMode::Follower),
_ => Err(()),
}
}
}
impl IndexScheduler {
fn private_clone(&self) -> IndexScheduler {
IndexScheduler {
@@ -371,8 +342,6 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(),
cluster: self.cluster.clone(),
consistency_level: self.consistency_level,
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
#[cfg(test)]
@@ -387,8 +356,6 @@ impl IndexScheduler {
/// Create an index scheduler and start its run loop.
pub fn new(
options: IndexSchedulerOptions,
cluster: Option<Cluster>,
consistency_level: Consistency,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<(Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, tests::FailureLocation)>,
) -> Result<Self> {
@@ -448,8 +415,6 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
cluster,
consistency_level,
#[cfg(test)]
test_breakpoint_sdr,
@@ -542,26 +507,6 @@ impl IndexScheduler {
/// only once per index scheduler.
fn run(&self) {
let run = self.private_clone();
// if we're a follower we starts a thread to register the tasks coming from the leader
if let Some(Cluster::Follower(ref follower)) = self.cluster {
let this = self.private_clone();
let follower = follower.clone();
std::thread::spawn(move || loop {
let (task, content) = follower.get_new_task();
this.register_raw_task(task, content);
});
} else if let Some(Cluster::Leader(ref leader)) = self.cluster {
// we need a way to let the leader come out of its loop if a new follower joins the cluster
let cluster = leader.wake_up.clone();
let scheduler = self.wake_up.clone();
std::thread::spawn(move || loop {
cluster.wait();
scheduler.signal();
});
}
std::thread::Builder::new()
.name(String::from("scheduler"))
.spawn(move || {
@@ -620,7 +565,7 @@ impl IndexScheduler {
}
/// Return the name of all indexes without opening them.
pub fn index_names(self) -> Result<Vec<String>> {
pub fn index_names(&self) -> Result<Vec<String>> {
let rtxn = self.env.read_txn()?;
self.index_mapper.index_names(&rtxn)
}
@@ -919,16 +864,6 @@ impl IndexScheduler {
return Err(e.into());
}
if let Some(Cluster::Leader(leader)) = &self.cluster {
let update_file = if let Some(uuid) = task.content_uuid() {
let path = self.file_store.get_update_path(uuid);
Some(std::fs::read(path).unwrap())
} else {
None
};
leader.register_new_task(task.clone(), update_file);
}
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
@@ -1058,44 +993,6 @@ impl IndexScheduler {
Ok(task)
}
/// /!\ should only be used when you're a follower in cluster mode
pub fn register_raw_task(&self, task: Task, content_file: Option<Vec<u8>>) {
if let Some(content) = content_file {
let uuid = task.content_uuid().expect("bad task");
let (_, mut file) = self.file_store.new_update_with_uuid(uuid.as_u128()).unwrap();
file.write_all(&content).unwrap();
file.persist().unwrap();
}
let mut wtxn = self.env.write_txn().unwrap();
self.all_tasks.put(&mut wtxn, &BEU32::new(task.uid), &task).unwrap();
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})
.unwrap();
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})
.unwrap();
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)
.unwrap();
wtxn.commit().unwrap();
self.wake_up.signal();
}
/// Create a new index without any associated task.
pub fn create_raw_index(
&self,
@@ -1152,15 +1049,14 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::Start);
}
info!("before getting a new batch");
let batch = match self.get_or_create_next_batch()? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
info!("after getting a new batch");
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
};
let index_uid = batch.index_uid().map(ToOwned::to_owned);
// TODO cluster: Should we send the starting date as well so everyone is in sync?
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
@@ -1289,61 +1185,12 @@ impl IndexScheduler {
Ok(TickOutcome::TickAgain(processed_tasks))
}
/// If there is no cluster or if leader -> create a new batch
/// If follower -> wait till the leader gives us a batch to process
fn get_or_create_next_batch(&self) -> Result<Option<Batch>> {
info!("inside get or create next batch");
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
let is_indexing = self.is_index_processing(index_uid)?;
let rtxn = self.read_txn()?;
let index_stats = self.index_mapper.stats_of(&rtxn, index_uid)?;
let batch = match &self.cluster {
None | Some(Cluster::Leader(_)) => {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))?
}
Some(Cluster::Follower(follower)) => {
let batch = follower.get_new_batch();
Some(self.get_batch_from_cluster_batch(batch)?)
}
};
if let Some(Cluster::Leader(leader)) = &self.cluster {
// first, onboard the new followers
if leader.has_new_followers() {
info!("New followers are trying to join the cluster");
let started_at = OffsetDateTime::now_utc();
let dump = self
.create_dump(
&Task {
uid: TaskId::MAX,
enqueued_at: started_at,
started_at: Some(started_at),
finished_at: Some(started_at),
error: None,
canceled_by: None,
details: None,
status: Status::Enqueued,
kind: KindWithContent::DumpCreation {
keys: leader.get_keys(),
// TODO cluster: should we unify the instance_uid between every instances?
instance_uid: None,
},
},
&started_at,
)
.unwrap();
let mut buffer = Vec::new();
// TODO cluster: stop writing everything in RAM
dump.persist_to(&mut buffer).unwrap();
leader.join_me(buffer);
}
// second, starts processing the batch
if let Some(ref batch) = batch {
leader.starts_batch(batch.clone().into());
}
}
Ok(batch)
Ok(IndexStats { is_indexing, inner_stats: index_stats })
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
@@ -1398,6 +1245,17 @@ struct IndexBudget {
task_db_size: usize,
}
/// The statistics that can be computed from an `Index` object and the scheduler.
///
/// Compared with `index_mapper::IndexStats`, it adds the scheduling status.
#[derive(Debug)]
pub struct IndexStats {
/// Whether this index is currently performing indexation, according to the scheduler.
pub is_indexing: bool,
/// Internal stats computed from the index.
pub inner_stats: index_mapper::IndexStats,
}
#[cfg(test)]
mod tests {
use std::io::{BufWriter, Seek, Write};
@@ -1461,8 +1319,7 @@ mod tests {
autobatching_enabled,
};
let index_scheduler =
Self::new(options, None, Consistency::default(), sender, planned_failures).unwrap();
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.

View File

@@ -1,6 +1,5 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1755
---
### Autobatching Enabled = true
### Processing Tasks:
@@ -23,7 +22,7 @@ canceled [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:
1 [0,]

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,9 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["beavero", "catto"]
beavero: { number_of_documents: 0, field_distribution: {} }
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -1,6 +1,5 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1859
---
### Autobatching Enabled = true
### Processing Tasks:
@@ -27,7 +26,9 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["beavero", "catto"]
beavero: { number_of_documents: 0, field_distribution: {} }
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:
3 [1,2,]

View File

@@ -23,7 +23,8 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,8 @@ catto [0,]
wolfo [2,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,8 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -1,6 +1,5 @@
---
source: index-scheduler/src/lib.rs
assertion_line: 1818
---
### Autobatching Enabled = true
### Processing Tasks:
@@ -23,7 +22,8 @@ canceled [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
1 [0,]

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -21,7 +21,8 @@ succeeded [0,1,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:
1 []

View File

@@ -19,7 +19,8 @@ succeeded [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 1, field_distribution: {"id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -27,7 +27,10 @@ doggos [0,3,]
girafos [2,5,]
----------------------------------------------------------------------
### Index Mapper:
["cattos", "doggos", "girafos"]
cattos: { number_of_documents: 0, field_distribution: {} }
doggos: { number_of_documents: 0, field_distribution: {} }
girafos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -19,7 +19,8 @@ succeeded [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -21,7 +21,8 @@ succeeded [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ succeeded [0,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,7 @@ succeeded [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ enqueued [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -21,7 +21,7 @@ succeeded [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -21,7 +21,7 @@ failed [0,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,8 @@ failed [0,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 3, field_distribution: {"catto": 1, "doggo": 2, "id": 3} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -19,7 +19,7 @@ failed [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -19,7 +19,7 @@ failed [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,8 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,8 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -19,7 +19,8 @@ succeeded [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 1, field_distribution: {"doggo": 1, "id": 1} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
index_a [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
index_a [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ index_a [0,]
index_b [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ index_a [0,2,]
index_b [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -19,7 +19,7 @@ failed [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
catto [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,9 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
["cattos", "doggos"]
cattos: { number_of_documents: 0, field_distribution: {} }
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
["cattos"]
cattos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ cattos [1,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ cattos [1,]
doggos [0,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ succeeded [0,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ succeeded [0,1,2,3,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggos [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ enqueued [0,1,2,3,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ enqueued [0,1,]
doggos [0,1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -21,7 +21,7 @@ enqueued [0,1,2,]
doggos [0,1,2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ succeeded [0,1,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,8 @@ succeeded [0,1,2,]
doggos [0,1,2,3,]
----------------------------------------------------------------------
### Index Mapper:
["doggos"]
doggos: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -26,7 +26,8 @@ catto [0,2,]
doggo [1,2,]
----------------------------------------------------------------------
### Index Mapper:
["catto"]
catto: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:
3 [1,2,]

View File

@@ -23,7 +23,10 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
["catto", "doggo", "whalo"]
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
whalo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -18,7 +18,7 @@ enqueued [0,]
doggo [0,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -20,7 +20,7 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ doggo [0,]
whalo [1,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -24,7 +24,9 @@ doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
["catto", "doggo"]
catto: { number_of_documents: 0, field_distribution: {} }
doggo: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -22,7 +22,7 @@ doggo [1,]
whalo [2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -24,7 +24,7 @@ doggo [1,2,]
whalo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -23,7 +23,7 @@ catto [0,1,2,]
doggo [3,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,8 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
["a"]
a: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,9 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b"]
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,10 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c"]
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -25,7 +25,11 @@ c [2,]
d [3,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

View File

@@ -28,7 +28,11 @@ c [3,4,5,]
d [2,4,]
----------------------------------------------------------------------
### Index Mapper:
["a", "b", "c", "d"]
a: { number_of_documents: 0, field_distribution: {} }
b: { number_of_documents: 0, field_distribution: {} }
c: { number_of_documents: 0, field_distribution: {} }
d: { number_of_documents: 0, field_distribution: {} }
----------------------------------------------------------------------
### Canceled By:

Some files were not shown because too many files have changed in this diff Show More