start implementing a skeleton of the v1 dump reader

This commit is contained in:
Tamo
2022-10-03 13:57:18 +02:00
committed by Clément Renault
parent f041d474a5
commit 699ae1b190
17 changed files with 1613 additions and 0 deletions

177
dump/src/reader/v1/mod.rs Normal file
View File

@ -0,0 +1,177 @@
use std::{
convert::Infallible,
fs::{self, File},
io::{BufRead, BufReader},
path::Path,
};
use serde::Deserialize;
use tempfile::TempDir;
use time::OffsetDateTime;
use self::update::UpdateStatus;
use super::{DumpReader, IndexReader};
use crate::{Error, Result, Version};
pub mod settings;
pub mod update;
pub mod v1;
pub struct V1Reader {
dump: TempDir,
metadata: v1::Metadata,
indexes: Vec<V1IndexReader>,
}
struct V1IndexReader {
name: String,
documents: File,
settings: File,
updates: File,
current_update: Option<UpdateStatus>,
}
impl V1IndexReader {
pub fn new(name: String, path: &Path) -> Result<Self> {
let mut ret = V1IndexReader {
name,
documents: File::open(path.join("documents.jsonl"))?,
settings: File::open(path.join("settings.json"))?,
updates: File::open(path.join("updates.jsonl"))?,
current_update: None,
};
ret.next_update();
Ok(ret)
}
pub fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
let mut tasks = self.updates;
let mut reader = BufReader::new(&mut tasks);
let current_update = if let Some(line) = reader.lines().next() {
Some(serde_json::from_str(&line?)?)
} else {
None
};
Ok(std::mem::replace(&mut self.current_update, current_update))
}
}
impl V1Reader {
pub fn open(dump: TempDir) -> Result<Self> {
let mut meta_file = fs::read(dump.path().join("metadata.json"))?;
let metadata = serde_json::from_reader(&*meta_file)?;
let mut indexes = Vec::new();
let entries = fs::read_dir(dump.path())?;
for entry in entries {
let entry = entry?;
if entry.file_type()?.is_dir() {
indexes.push(V1IndexReader::new(
entry
.file_name()
.to_str()
.ok_or(Error::BadIndexName)?
.to_string(),
&entry.path(),
)?);
}
}
Ok(V1Reader {
dump,
metadata,
indexes,
})
}
pub fn date(&self) -> Result<Option<OffsetDateTime>> {
Ok(None)
}
fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
if let Some((idx, _)) = self
.indexes
.iter()
.map(|index| index.current_update)
.enumerate()
.filter_map(|(idx, update)| update.map(|u| (idx, u)))
.min_by_key(|(_, update)| update.enqueued_at())
{
self.indexes[idx].next_update()
} else {
Ok(None)
}
}
}
impl IndexReader for &V1IndexReader {
type Document = serde_json::Value;
type Settings = settings::Settings;
fn name(&self) -> &str {
todo!()
}
fn documents(&self) -> Result<Box<dyn Iterator<Item = Self::Document>>> {
todo!()
}
fn settings(&self) -> Result<Self::Settings> {
todo!()
}
}
impl DumpReader for V1Reader {
type Document = serde_json::Value;
type Settings = settings::Settings;
type Task = update::UpdateStatus;
type UpdateFile = ();
type Key = Infallible;
fn date(&self) -> Result<Option<OffsetDateTime>> {
Ok(None)
}
fn version(&self) -> Version {
Version::V1
}
fn indexes(
&self,
) -> Result<
Box<
dyn Iterator<
Item = Box<
dyn super::IndexReader<Document = Self::Document, Settings = Self::Settings>,
>,
>,
>,
> {
Ok(Box::new(self.indexes.iter().map(|index| {
Box::new(index)
as Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>
})))
}
fn tasks(
&self,
) -> Result<Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>>> {
Ok(Box::new(std::iter::from_fn(|| {
self.next_update()
.transpose()
.map(|result| result.map(|task| (task, None)))
})))
}
fn keys(&self) -> Result<Box<dyn Iterator<Item = Self::Key>>> {
Ok(Box::new(std::iter::empty()))
}
}

View File

@ -0,0 +1,63 @@
use std::collections::{BTreeMap, BTreeSet};
use std::result::Result as StdResult;
use serde::{Deserialize, Deserializer, Serialize};
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct Settings {
#[serde(default, deserialize_with = "deserialize_some")]
pub ranking_rules: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub distinct_attribute: Option<Option<String>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
#[serde(default, deserialize_with = "deserialize_some")]
pub attributes_for_faceting: Option<Option<Vec<String>>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SettingsUpdate {
pub ranking_rules: UpdateState<Vec<RankingRule>>,
pub distinct_attribute: UpdateState<String>,
pub primary_key: UpdateState<String>,
pub searchable_attributes: UpdateState<Vec<String>>,
pub displayed_attributes: UpdateState<BTreeSet<String>>,
pub stop_words: UpdateState<BTreeSet<String>>,
pub synonyms: UpdateState<BTreeMap<String, Vec<String>>>,
pub attributes_for_faceting: UpdateState<Vec<String>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateState<T> {
Update(T),
Clear,
Nothing,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RankingRule {
Typo,
Words,
Proximity,
Attribute,
WordsPosition,
Exactness,
Asc(String),
Desc(String),
}
// Any value that is present is considered Some value, including null.
fn deserialize_some<'de, T, D>(deserializer: D) -> StdResult<Option<T>, D::Error>
where
T: Deserialize<'de>,
D: Deserializer<'de>,
{
Deserialize::deserialize(deserializer).map(Some)
}

View File

@ -0,0 +1,120 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::OffsetDateTime;
use super::settings::SettingsUpdate;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Update {
data: UpdateData,
#[serde(with = "time::serde::rfc3339")]
enqueued_at: OffsetDateTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateData {
ClearAll,
Customs(Vec<u8>),
// (primary key, documents)
DocumentsAddition {
primary_key: Option<String>,
documents: Vec<serde_json::Map<String, Value>>,
},
DocumentsPartial {
primary_key: Option<String>,
documents: Vec<serde_json::Map<String, Value>>,
},
DocumentsDeletion(Vec<String>),
Settings(Box<SettingsUpdate>),
}
impl UpdateData {
pub fn update_type(&self) -> UpdateType {
match self {
UpdateData::ClearAll => UpdateType::ClearAll,
UpdateData::Customs(_) => UpdateType::Customs,
UpdateData::DocumentsAddition { documents, .. } => UpdateType::DocumentsAddition {
number: documents.len(),
},
UpdateData::DocumentsPartial { documents, .. } => UpdateType::DocumentsPartial {
number: documents.len(),
},
UpdateData::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(),
},
UpdateData::Settings(update) => UpdateType::Settings {
settings: update.clone(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "name")]
pub enum UpdateType {
ClearAll,
Customs,
DocumentsAddition { number: usize },
DocumentsPartial { number: usize },
DocumentsDeletion { number: usize },
Settings { settings: Box<SettingsUpdate> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ProcessedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_link: Option<String>,
pub duration: f64, // in seconds
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
pub processed_at: OffsetDateTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EnqueuedUpdateResult {
pub update_id: u64,
#[serde(rename = "type")]
pub update_type: UpdateType,
#[serde(with = "time::serde::rfc3339")]
pub enqueued_at: OffsetDateTime,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "status")]
pub enum UpdateStatus {
Enqueued {
#[serde(flatten)]
content: EnqueuedUpdateResult,
},
Failed {
#[serde(flatten)]
content: ProcessedUpdateResult,
},
Processed {
#[serde(flatten)]
content: ProcessedUpdateResult,
},
}
impl UpdateStatus {
pub fn enqueued_at(&self) -> &OffsetDateTime {
match self {
UpdateStatus::Enqueued { content } => &content.enqueued_at,
UpdateStatus::Failed { content } | UpdateStatus::Processed { content } => {
&content.enqueued_at
}
}
}
}

22
dump/src/reader/v1/v1.rs Normal file
View File

@ -0,0 +1,22 @@
use serde::Deserialize;
use time::OffsetDateTime;
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Index {
pub name: String,
pub uid: String,
#[serde(with = "time::serde::rfc3339")]
created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339")]
updated_at: OffsetDateTime,
pub primary_key: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
indexes: Vec<Index>,
db_version: String,
dump_version: crate::Version,
}