mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-10-26 13:36:27 +00:00 
			
		
		
		
	Expose a route to get the update file content of a task
This commit is contained in:
		| @@ -109,6 +109,8 @@ pub enum Error { | ||||
|     InvalidIndexUid { index_uid: String }, | ||||
|     #[error("Task `{0}` not found.")] | ||||
|     TaskNotFound(TaskId), | ||||
|     #[error("Task `{0}` does not provide any content file.")] | ||||
|     TaskFileNotFound(TaskId), | ||||
|     #[error("Batch `{0}` not found.")] | ||||
|     BatchNotFound(BatchId), | ||||
|     #[error("Query parameters to filter the tasks to delete are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")] | ||||
| @@ -189,6 +191,7 @@ impl Error { | ||||
|             | Error::InvalidTaskCanceledBy { .. } | ||||
|             | Error::InvalidIndexUid { .. } | ||||
|             | Error::TaskNotFound(_) | ||||
|             | Error::TaskFileNotFound(_) | ||||
|             | Error::BatchNotFound(_) | ||||
|             | Error::TaskDeletionWithEmptyQuery | ||||
|             | Error::TaskCancelationWithEmptyQuery | ||||
| @@ -250,6 +253,7 @@ impl ErrorCode for Error { | ||||
|             Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy, | ||||
|             Error::InvalidIndexUid { .. } => Code::InvalidIndexUid, | ||||
|             Error::TaskNotFound(_) => Code::TaskNotFound, | ||||
|             Error::TaskFileNotFound(_) => Code::TaskFileNotFound, | ||||
|             Error::BatchNotFound(_) => Code::BatchNotFound, | ||||
|             Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, | ||||
|             Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, | ||||
|   | ||||
| @@ -8,6 +8,7 @@ mod tasks_test; | ||||
| mod test; | ||||
|  | ||||
| use std::collections::BTreeMap; | ||||
| use std::fs::File as StdFile; | ||||
| use std::time::Duration; | ||||
|  | ||||
| use file_store::FileStore; | ||||
| @@ -216,6 +217,11 @@ impl Queue { | ||||
|         } | ||||
|     } | ||||
|  | ||||
|     /// Open and returns the task's content File. | ||||
|     pub fn update_file(&self, uuid: Uuid) -> file_store::Result<StdFile> { | ||||
|         self.file_store.get_update(uuid) | ||||
|     } | ||||
|  | ||||
|     /// Delete a file from the index scheduler. | ||||
|     /// | ||||
|     /// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method. | ||||
|   | ||||
| @@ -372,6 +372,7 @@ RemoteRemoteError                     , System               , BAD_GATEWAY ; | ||||
| RemoteTimeout                         , System               , BAD_GATEWAY ; | ||||
| TooManySearchRequests                 , System               , SERVICE_UNAVAILABLE ; | ||||
| TaskNotFound                          , InvalidRequest       , NOT_FOUND ; | ||||
| TaskFileNotFound                      , InvalidRequest       , NOT_FOUND ; | ||||
| BatchNotFound                         , InvalidRequest       , NOT_FOUND ; | ||||
| TooManyOpenFiles                      , System               , UNPROCESSABLE_ENTITY ; | ||||
| TooManyVectors                        , InvalidRequest       , BAD_REQUEST ; | ||||
|   | ||||
| @@ -16,6 +16,7 @@ use serde::Serialize; | ||||
| use time::format_description::well_known::Rfc3339; | ||||
| use time::macros::format_description; | ||||
| use time::{Date, Duration, OffsetDateTime, Time}; | ||||
| use tokio::io::AsyncReadExt; | ||||
| use tokio::task; | ||||
| use utoipa::{IntoParams, OpenApi, ToSchema}; | ||||
|  | ||||
| @@ -44,7 +45,10 @@ pub fn configure(cfg: &mut web::ServiceConfig) { | ||||
|             .route(web::delete().to(SeqHandler(delete_tasks))), | ||||
|     ) | ||||
|     .service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks)))) | ||||
|     .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))); | ||||
|     .service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task)))) | ||||
|     .service( | ||||
|         web::resource("/{task_id}/file").route(web::get().to(SeqHandler(get_task_update_file))), | ||||
|     ); | ||||
| } | ||||
|  | ||||
| #[derive(Debug, Deserr, IntoParams)] | ||||
| @@ -639,6 +643,88 @@ async fn get_task( | ||||
|     } | ||||
| } | ||||
|  | ||||
| /// Get a task's update file. | ||||
| /// | ||||
| /// Get a [task's file](https://www.meilisearch.com/docs/learn/async/asynchronous_operations). | ||||
| #[utoipa::path( | ||||
|     get, | ||||
|     path = "/{taskUid}/file", | ||||
|     tag = "Tasks", | ||||
|     security(("Bearer" = ["tasks.get", "tasks.*", "*"])), | ||||
|     params(("taskUid", format = UInt32, example = 0, description = "The task identifier", nullable = false)), | ||||
|     responses( | ||||
|         (status = 200, description = "Task successfully retrieved", body = TaskView, content_type = "application/x-ndjson", example = json!( | ||||
|             { | ||||
|                 "uid": 1, | ||||
|                 "indexUid": "movies", | ||||
|                 "status": "succeeded", | ||||
|                 "type": "documentAdditionOrUpdate", | ||||
|                 "canceledBy": null, | ||||
|                 "details": { | ||||
|                     "receivedDocuments": 79000, | ||||
|                     "indexedDocuments": 79000 | ||||
|                 }, | ||||
|                 "error": null, | ||||
|                 "duration": "PT1S", | ||||
|                 "enqueuedAt": "2021-01-01T09:39:00.000000Z", | ||||
|                 "startedAt": "2021-01-01T09:39:01.000000Z", | ||||
|                 "finishedAt": "2021-01-01T09:39:02.000000Z" | ||||
|             } | ||||
|         )), | ||||
|         (status = 401, description = "The authorization header is missing", body = ResponseError, content_type = "application/json", example = json!( | ||||
|             { | ||||
|                 "message": "The Authorization header is missing. It must use the bearer authorization method.", | ||||
|                 "code": "missing_authorization_header", | ||||
|                 "type": "auth", | ||||
|                 "link": "https://docs.meilisearch.com/errors#missing_authorization_header" | ||||
|             } | ||||
|         )), | ||||
|         (status = 404, description = "The task uid does not exists", body = ResponseError, content_type = "application/json", example = json!( | ||||
|             { | ||||
|                 "message": "Task :taskUid not found.", | ||||
|                 "code": "task_not_found", | ||||
|                 "type": "invalid_request", | ||||
|                 "link": "https://docs.meilisearch.com/errors/#task_not_found" | ||||
|             } | ||||
|         )) | ||||
|     ) | ||||
| )] | ||||
| async fn get_task_update_file( | ||||
|     index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>, | ||||
|     task_uid: web::Path<String>, | ||||
| ) -> Result<HttpResponse, ResponseError> { | ||||
|     /// TODO change the example | ||||
|     let task_uid_string = task_uid.into_inner(); | ||||
|  | ||||
|     let task_uid: TaskId = match task_uid_string.parse() { | ||||
|         Ok(id) => id, | ||||
|         Err(_e) => { | ||||
|             return Err(index_scheduler::Error::InvalidTaskUid { task_uid: task_uid_string }.into()) | ||||
|         } | ||||
|     }; | ||||
|  | ||||
|     let query = index_scheduler::Query { uids: Some(vec![task_uid]), ..Query::default() }; | ||||
|     let filters = index_scheduler.filters(); | ||||
|     let (tasks, _) = index_scheduler.get_tasks_from_authorized_indexes(&query, filters)?; | ||||
|  | ||||
|     if let Some(task) = tasks.first() { | ||||
|         match task.content_uuid() { | ||||
|             Some(uuid) => { | ||||
|                 // Yes, that's awful to put everything in memory when we could have streamed it from | ||||
|                 // disk but it's really (really) complex to do with the current state of async Rust. | ||||
|                 let file = index_scheduler.queue.update_file(uuid)?; | ||||
|                 let mut tfile = tokio::fs::File::from_std(file); | ||||
|                 let mut content = String::new(); | ||||
|                 tfile.read_to_string(&mut content).await?; | ||||
|                 Ok(HttpResponse::Ok().content_type("application/x-ndjson").body(content)) | ||||
|             } | ||||
|             None => Err(index_scheduler::Error::TaskFileNotFound(task_uid).into()), | ||||
|         } | ||||
|     } else { | ||||
|         Err(index_scheduler::Error::TaskNotFound(task_uid).into()) | ||||
|     } | ||||
| } | ||||
|  | ||||
| pub enum DeserializeDateOption { | ||||
|     Before, | ||||
|     After, | ||||
|   | ||||
		Reference in New Issue
	
	Block a user