Spawn threads with names

This commit is contained in:
Louis Dureuil 2022-11-16 09:50:47 +01:00
parent 51be75a264
commit 93afeedcea
No known key found for this signature in database
3 changed files with 54 additions and 42 deletions

View File

@ -126,24 +126,27 @@ impl IndexMapper {
let index_map = self.index_map.clone(); let index_map = self.index_map.clone();
let index_path = self.base_path.join(uuid.to_string()); let index_path = self.base_path.join(uuid.to_string());
let index_name = name.to_string(); let index_name = name.to_string();
thread::spawn(move || { thread::Builder::new()
// We first wait to be sure that the previously opened index is effectively closed. .name(String::from("index_deleter"))
// This can take a lot of time, this is why we do that in a seperate thread. .spawn(move || {
if let Some(closing_event) = closing_event { // We first wait to be sure that the previously opened index is effectively closed.
closing_event.wait(); // This can take a lot of time, this is why we do that in a seperate thread.
} if let Some(closing_event) = closing_event {
closing_event.wait();
}
// Then we remove the content from disk. // Then we remove the content from disk.
if let Err(e) = fs::remove_dir_all(&index_path) { if let Err(e) = fs::remove_dir_all(&index_path) {
error!( error!(
"An error happened when deleting the index {} ({}): {}", "An error happened when deleting the index {} ({}): {}",
index_name, uuid, e index_name, uuid, e
); );
} }
// Finally we remove the entry from the index map. // Finally we remove the entry from the index map.
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted))); assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
}); })
.unwrap();
Ok(()) Ok(())
} }

View File

@ -412,28 +412,31 @@ impl IndexScheduler {
/// only once per index scheduler. /// only once per index scheduler.
fn run(&self) { fn run(&self) {
let run = self.private_clone(); let run = self.private_clone();
std::thread::spawn(move || loop { std::thread::Builder::new()
run.wake_up.wait(); .name(String::from("scheduler"))
.spawn(move || loop {
run.wake_up.wait();
match run.tick() { match run.tick() {
Ok(0) => (), Ok(0) => (),
Ok(_) => run.wake_up.signal(), Ok(_) => run.wake_up.signal(),
Err(e) => { Err(e) => {
log::error!("{}", e); log::error!("{}", e);
// Wait one second when an irrecoverable error occurs. // Wait one second when an irrecoverable error occurs.
if matches!( if matches!(
e, e,
Error::CorruptedTaskQueue Error::CorruptedTaskQueue
| Error::TaskDatabaseUpdate(_) | Error::TaskDatabaseUpdate(_)
| Error::HeedTransaction(_) | Error::HeedTransaction(_)
| Error::CreateBatch(_) | Error::CreateBatch(_)
) { ) {
std::thread::sleep(Duration::from_secs(1)); std::thread::sleep(Duration::from_secs(1));
}
run.wake_up.signal();
} }
run.wake_up.signal();
} }
} })
}); .unwrap();
} }
pub fn indexer_config(&self) -> &IndexerConfig { pub fn indexer_config(&self) -> &IndexerConfig {
@ -925,7 +928,10 @@ impl IndexScheduler {
// 2. Process the tasks // 2. Process the tasks
let res = { let res = {
let cloned_index_scheduler = self.private_clone(); let cloned_index_scheduler = self.private_clone();
let handle = std::thread::spawn(move || cloned_index_scheduler.process_batch(batch)); let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn(move || cloned_index_scheduler.process_batch(batch))
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked)) handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
}; };

View File

@ -204,12 +204,15 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Auth
if opt.schedule_snapshot { if opt.schedule_snapshot {
let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec); let snapshot_delay = Duration::from_secs(opt.snapshot_interval_sec);
let index_scheduler = index_scheduler.clone(); let index_scheduler = index_scheduler.clone();
thread::spawn(move || loop { thread::Builder::new()
thread::sleep(snapshot_delay); .name(String::from("register-snapshot-tasks"))
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { .spawn(move || loop {
error!("Error while registering snapshot: {}", e); thread::sleep(snapshot_delay);
} if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
}); error!("Error while registering snapshot: {}", e);
}
})
.unwrap();
} }
Ok((index_scheduler, auth_controller)) Ok((index_scheduler, auth_controller))