mirror of
https://github.com/meilisearch/meilisearch.git
synced 2025-09-06 04:36:32 +00:00
Add dump support
This commit is contained in:
@ -202,6 +202,10 @@ impl CompatV5ToV6 {
|
|||||||
pub fn network(&self) -> Result<Option<&v6::Network>> {
|
pub fn network(&self) -> Result<Option<&v6::Network>> {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn webhooks(&self) -> Option<&v6::Webhooks> {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum CompatIndexV5ToV6 {
|
pub enum CompatIndexV5ToV6 {
|
||||||
|
@ -138,6 +138,13 @@ impl DumpReader {
|
|||||||
DumpReader::Compat(compat) => compat.network(),
|
DumpReader::Compat(compat) => compat.network(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn webhooks(&self) -> Option<&v6::Webhooks> {
|
||||||
|
match self {
|
||||||
|
DumpReader::Current(current) => current.webhooks(),
|
||||||
|
DumpReader::Compat(compat) => compat.webhooks(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<V6Reader> for DumpReader {
|
impl From<V6Reader> for DumpReader {
|
||||||
|
@ -25,6 +25,7 @@ pub type Key = meilisearch_types::keys::Key;
|
|||||||
pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings;
|
pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings;
|
||||||
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures;
|
||||||
pub type Network = meilisearch_types::features::Network;
|
pub type Network = meilisearch_types::features::Network;
|
||||||
|
pub type Webhooks = meilisearch_types::webhooks::Webhooks;
|
||||||
|
|
||||||
// ===== Other types to clarify the code of the compat module
|
// ===== Other types to clarify the code of the compat module
|
||||||
// everything related to the tasks
|
// everything related to the tasks
|
||||||
@ -59,6 +60,7 @@ pub struct V6Reader {
|
|||||||
keys: BufReader<File>,
|
keys: BufReader<File>,
|
||||||
features: Option<RuntimeTogglableFeatures>,
|
features: Option<RuntimeTogglableFeatures>,
|
||||||
network: Option<Network>,
|
network: Option<Network>,
|
||||||
|
webhooks: Option<Webhooks>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl V6Reader {
|
impl V6Reader {
|
||||||
@ -93,8 +95,8 @@ impl V6Reader {
|
|||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let network_file = match fs::read(dump.path().join("network.json")) {
|
let network = match fs::read(dump.path().join("network.json")) {
|
||||||
Ok(network_file) => Some(network_file),
|
Ok(network_file) => Some(serde_json::from_reader(&*network_file)?),
|
||||||
Err(error) => match error.kind() {
|
Err(error) => match error.kind() {
|
||||||
// Allows the file to be missing, this will only result in all experimental features disabled.
|
// Allows the file to be missing, this will only result in all experimental features disabled.
|
||||||
ErrorKind::NotFound => {
|
ErrorKind::NotFound => {
|
||||||
@ -104,10 +106,16 @@ impl V6Reader {
|
|||||||
_ => return Err(error.into()),
|
_ => return Err(error.into()),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
let network = if let Some(network_file) = network_file {
|
|
||||||
Some(serde_json::from_reader(&*network_file)?)
|
let webhooks = match fs::read(dump.path().join("webhooks.json")) {
|
||||||
} else {
|
Ok(webhooks_file) => Some(serde_json::from_reader(&*webhooks_file)?),
|
||||||
|
Err(error) => match error.kind() {
|
||||||
|
ErrorKind::NotFound => {
|
||||||
|
debug!("`webhooks.json` not found in dump");
|
||||||
None
|
None
|
||||||
|
}
|
||||||
|
_ => return Err(error.into()),
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(V6Reader {
|
Ok(V6Reader {
|
||||||
@ -119,6 +127,7 @@ impl V6Reader {
|
|||||||
features,
|
features,
|
||||||
network,
|
network,
|
||||||
dump,
|
dump,
|
||||||
|
webhooks,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,6 +238,10 @@ impl V6Reader {
|
|||||||
pub fn network(&self) -> Option<&Network> {
|
pub fn network(&self) -> Option<&Network> {
|
||||||
self.network.as_ref()
|
self.network.as_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn webhooks(&self) -> Option<&Webhooks> {
|
||||||
|
self.webhooks.as_ref()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct UpdateFile {
|
pub struct UpdateFile {
|
||||||
|
@ -8,6 +8,7 @@ use meilisearch_types::batches::Batch;
|
|||||||
use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures};
|
use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures};
|
||||||
use meilisearch_types::keys::Key;
|
use meilisearch_types::keys::Key;
|
||||||
use meilisearch_types::settings::{Checked, Settings};
|
use meilisearch_types::settings::{Checked, Settings};
|
||||||
|
use meilisearch_types::webhooks::Webhooks;
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
@ -74,6 +75,16 @@ impl DumpWriter {
|
|||||||
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
|
Ok(std::fs::write(self.dir.path().join("network.json"), serde_json::to_string(&network)?)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn create_webhooks(&self, webhooks: Webhooks) -> Result<()> {
|
||||||
|
if webhooks == Webhooks::default() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
Ok(std::fs::write(
|
||||||
|
self.dir.path().join("webhooks.json"),
|
||||||
|
serde_json::to_string(&webhooks)?,
|
||||||
|
)?)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
|
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
|
||||||
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
|
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
|
||||||
let mut tar_encoder = tar::Builder::new(gz_encoder);
|
let mut tar_encoder = tar::Builder::new(gz_encoder);
|
||||||
|
@ -108,6 +108,7 @@ make_enum_progress! {
|
|||||||
DumpTheBatches,
|
DumpTheBatches,
|
||||||
DumpTheIndexes,
|
DumpTheIndexes,
|
||||||
DumpTheExperimentalFeatures,
|
DumpTheExperimentalFeatures,
|
||||||
|
DumpTheWebhooks,
|
||||||
CompressTheDump,
|
CompressTheDump,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,6 +270,11 @@ impl IndexScheduler {
|
|||||||
let network = self.network();
|
let network = self.network();
|
||||||
dump.create_network(network)?;
|
dump.create_network(network)?;
|
||||||
|
|
||||||
|
// 7. Dump the webhooks
|
||||||
|
progress.update_progress(DumpCreationProgress::DumpTheWebhooks);
|
||||||
|
let webhooks = self.webhooks();
|
||||||
|
dump.create_webhooks(webhooks)?;
|
||||||
|
|
||||||
let dump_uid = started_at.format(format_description!(
|
let dump_uid = started_at.format(format_description!(
|
||||||
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
|
||||||
)).unwrap();
|
)).unwrap();
|
||||||
|
@ -10,7 +10,7 @@ pub struct Webhook {
|
|||||||
pub headers: BTreeMap<String, String>,
|
pub headers: BTreeMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Default, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Webhooks {
|
pub struct Webhooks {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
@ -515,7 +515,12 @@ fn import_dump(
|
|||||||
let _ = std::fs::write(db_path.join("instance-uid"), instance_uid.to_string().as_bytes());
|
let _ = std::fs::write(db_path.join("instance-uid"), instance_uid.to_string().as_bytes());
|
||||||
};
|
};
|
||||||
|
|
||||||
// 2. Import the `Key`s.
|
// 2. Import the webhooks
|
||||||
|
if let Some(webhooks) = dump_reader.webhooks() {
|
||||||
|
index_scheduler.put_webhooks(webhooks.clone())?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Import the `Key`s.
|
||||||
let mut keys = Vec::new();
|
let mut keys = Vec::new();
|
||||||
auth.raw_delete_all_keys()?;
|
auth.raw_delete_all_keys()?;
|
||||||
for key in dump_reader.keys()? {
|
for key in dump_reader.keys()? {
|
||||||
@ -524,20 +529,20 @@ fn import_dump(
|
|||||||
keys.push(key);
|
keys.push(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 3. Import the `ChatCompletionSettings`s.
|
// 4. Import the `ChatCompletionSettings`s.
|
||||||
for result in dump_reader.chat_completions_settings()? {
|
for result in dump_reader.chat_completions_settings()? {
|
||||||
let (name, settings) = result?;
|
let (name, settings) = result?;
|
||||||
index_scheduler.put_chat_settings(&name, &settings)?;
|
index_scheduler.put_chat_settings(&name, &settings)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Import the runtime features and network
|
// 5. Import the runtime features and network
|
||||||
let features = dump_reader.features()?.unwrap_or_default();
|
let features = dump_reader.features()?.unwrap_or_default();
|
||||||
index_scheduler.put_runtime_features(features)?;
|
index_scheduler.put_runtime_features(features)?;
|
||||||
|
|
||||||
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
let network = dump_reader.network()?.cloned().unwrap_or_default();
|
||||||
index_scheduler.put_network(network)?;
|
index_scheduler.put_network(network)?;
|
||||||
|
|
||||||
// 4.1 Use all cpus to process dump if `max_indexing_threads` not configured
|
// 5.1 Use all cpus to process dump if `max_indexing_threads` not configured
|
||||||
let backup_config;
|
let backup_config;
|
||||||
let base_config = index_scheduler.indexer_config();
|
let base_config = index_scheduler.indexer_config();
|
||||||
|
|
||||||
@ -554,7 +559,7 @@ fn import_dump(
|
|||||||
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
// /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might
|
||||||
// try to process tasks while we're trying to import the indexes.
|
// try to process tasks while we're trying to import the indexes.
|
||||||
|
|
||||||
// 5. Import the indexes.
|
// 6. Import the indexes.
|
||||||
for index_reader in dump_reader.indexes()? {
|
for index_reader in dump_reader.indexes()? {
|
||||||
let mut index_reader = index_reader?;
|
let mut index_reader = index_reader?;
|
||||||
let metadata = index_reader.metadata();
|
let metadata = index_reader.metadata();
|
||||||
@ -567,12 +572,12 @@ fn import_dump(
|
|||||||
let mut wtxn = index.write_txn()?;
|
let mut wtxn = index.write_txn()?;
|
||||||
|
|
||||||
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config);
|
||||||
// 5.1 Import the primary key if there is one.
|
// 6.1 Import the primary key if there is one.
|
||||||
if let Some(ref primary_key) = metadata.primary_key {
|
if let Some(ref primary_key) = metadata.primary_key {
|
||||||
builder.set_primary_key(primary_key.to_string());
|
builder.set_primary_key(primary_key.to_string());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5.2 Import the settings.
|
// 6.2 Import the settings.
|
||||||
tracing::info!("Importing the settings.");
|
tracing::info!("Importing the settings.");
|
||||||
let settings = index_reader.settings()?;
|
let settings = index_reader.settings()?;
|
||||||
apply_settings_to_builder(&settings, &mut builder);
|
apply_settings_to_builder(&settings, &mut builder);
|
||||||
@ -584,8 +589,8 @@ fn import_dump(
|
|||||||
let rtxn = index.read_txn()?;
|
let rtxn = index.read_txn()?;
|
||||||
|
|
||||||
if index_scheduler.no_edition_2024_for_dumps() {
|
if index_scheduler.no_edition_2024_for_dumps() {
|
||||||
// 5.3 Import the documents.
|
// 6.3 Import the documents.
|
||||||
// 5.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
// 6.3.1 We need to recreate the grenad+obkv format accepted by the index.
|
||||||
tracing::info!("Importing the documents.");
|
tracing::info!("Importing the documents.");
|
||||||
let file = tempfile::tempfile()?;
|
let file = tempfile::tempfile()?;
|
||||||
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
|
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file));
|
||||||
@ -596,7 +601,7 @@ fn import_dump(
|
|||||||
// This flush the content of the batch builder.
|
// This flush the content of the batch builder.
|
||||||
let file = builder.into_inner()?.into_inner()?;
|
let file = builder.into_inner()?.into_inner()?;
|
||||||
|
|
||||||
// 5.3.2 We feed it to the milli index.
|
// 6.3.2 We feed it to the milli index.
|
||||||
let reader = BufReader::new(file);
|
let reader = BufReader::new(file);
|
||||||
let reader = DocumentsBatchReader::from_reader(reader)?;
|
let reader = DocumentsBatchReader::from_reader(reader)?;
|
||||||
|
|
||||||
@ -675,15 +680,15 @@ fn import_dump(
|
|||||||
index_scheduler.refresh_index_stats(&uid)?;
|
index_scheduler.refresh_index_stats(&uid)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6. Import the queue
|
// 7. Import the queue
|
||||||
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
let mut index_scheduler_dump = index_scheduler.register_dumped_task()?;
|
||||||
// 6.1. Import the batches
|
// 7.1. Import the batches
|
||||||
for ret in dump_reader.batches()? {
|
for ret in dump_reader.batches()? {
|
||||||
let batch = ret?;
|
let batch = ret?;
|
||||||
index_scheduler_dump.register_dumped_batch(batch)?;
|
index_scheduler_dump.register_dumped_batch(batch)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 6.2. Import the tasks
|
// 7.2. Import the tasks
|
||||||
for ret in dump_reader.tasks()? {
|
for ret in dump_reader.tasks()? {
|
||||||
let (task, file) = ret?;
|
let (task, file) = ret?;
|
||||||
index_scheduler_dump.register_dumped_task(task, file)?;
|
index_scheduler_dump.register_dumped_task(task, file)?;
|
||||||
|
Reference in New Issue
Block a user