mirror of
				https://github.com/meilisearch/meilisearch.git
				synced 2025-11-04 01:46:28 +00:00 
			
		
		
		
	Send a WakeUp when writing data in the BBQueue buffers
This commit is contained in:
		@@ -422,6 +422,12 @@ impl<'b> ExtractorBbqueueSender<'b> {
 | 
			
		||||
        // We could commit only the used memory.
 | 
			
		||||
        grant.commit(total_length);
 | 
			
		||||
 | 
			
		||||
        // We only send a wake up message when the channel is empty
 | 
			
		||||
        // so that we don't fill the channel with too many WakeUps.
 | 
			
		||||
        if self.sender.is_empty() {
 | 
			
		||||
            self.sender.send(ReceiverAction::WakeUp).unwrap();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -460,6 +466,12 @@ impl<'b> ExtractorBbqueueSender<'b> {
 | 
			
		||||
        // We could commit only the used memory.
 | 
			
		||||
        grant.commit(total_length);
 | 
			
		||||
 | 
			
		||||
        // We only send a wake up message when the channel is empty
 | 
			
		||||
        // so that we don't fill the channel with too many WakeUps.
 | 
			
		||||
        if self.sender.is_empty() {
 | 
			
		||||
            self.sender.send(ReceiverAction::WakeUp).unwrap();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -511,6 +523,12 @@ impl<'b> ExtractorBbqueueSender<'b> {
 | 
			
		||||
        // We could commit only the used memory.
 | 
			
		||||
        grant.commit(total_length);
 | 
			
		||||
 | 
			
		||||
        // We only send a wake up message when the channel is empty
 | 
			
		||||
        // so that we don't fill the channel with too many WakeUps.
 | 
			
		||||
        if self.sender.is_empty() {
 | 
			
		||||
            self.sender.send(ReceiverAction::WakeUp).unwrap();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -561,6 +579,12 @@ impl<'b> ExtractorBbqueueSender<'b> {
 | 
			
		||||
        // We could commit only the used memory.
 | 
			
		||||
        grant.commit(total_length);
 | 
			
		||||
 | 
			
		||||
        // We only send a wake up message when the channel is empty
 | 
			
		||||
        // so that we don't fill the channel with too many WakeUps.
 | 
			
		||||
        if self.sender.is_empty() {
 | 
			
		||||
            self.sender.send(ReceiverAction::WakeUp).unwrap();
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        Ok(())
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -420,61 +420,27 @@ where
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
 | 
			
		||||
                while let Some(frame_with_header) = writer_receiver.read() {
 | 
			
		||||
                    match frame_with_header.header() {
 | 
			
		||||
                        EntryHeader::DbOperation(operation) => {
 | 
			
		||||
                            let database_name = operation.database.database_name();
 | 
			
		||||
                            let database = operation.database.database(index);
 | 
			
		||||
                            let frame = frame_with_header.frame();
 | 
			
		||||
                            match operation.key_value(frame) {
 | 
			
		||||
                                (key, Some(value)) => {
 | 
			
		||||
                                    if let Err(error) = database.put(wtxn, key, value) {
 | 
			
		||||
                                        return Err(Error::InternalError(InternalError::StorePut {
 | 
			
		||||
                                            database_name,
 | 
			
		||||
                                            key: key.into(),
 | 
			
		||||
                                            value_length: value.len(),
 | 
			
		||||
                                            error,
 | 
			
		||||
                                        }));
 | 
			
		||||
                                    }
 | 
			
		||||
                                }
 | 
			
		||||
                                (key, None) => match database.delete(wtxn, key) {
 | 
			
		||||
                                    Ok(false) => {
 | 
			
		||||
                                        unreachable!("We tried to delete an unknown key: {key:?}")
 | 
			
		||||
                                    }
 | 
			
		||||
                                    Ok(_) => (),
 | 
			
		||||
                                    Err(error) => {
 | 
			
		||||
                                        return Err(Error::InternalError(
 | 
			
		||||
                                            InternalError::StoreDeletion {
 | 
			
		||||
                                                database_name,
 | 
			
		||||
                                                key: key.into(),
 | 
			
		||||
                                                error,
 | 
			
		||||
                                            },
 | 
			
		||||
                                        ));
 | 
			
		||||
                                    }
 | 
			
		||||
                                },
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
 | 
			
		||||
                            for (_index, (_name, _embedder, writer, dimensions)) in &mut arroy_writers {
 | 
			
		||||
                                let dimensions = *dimensions;
 | 
			
		||||
                                writer.del_items(wtxn, dimensions, docid)?;
 | 
			
		||||
                            }
 | 
			
		||||
                        }
 | 
			
		||||
                        EntryHeader::ArroySetVector(asv) => {
 | 
			
		||||
                            let ArroySetVector { docid, embedder_id, .. } = asv;
 | 
			
		||||
                            let frame = frame_with_header.frame();
 | 
			
		||||
                            let embedding = asv.read_embedding_into_vec(frame, &mut aligned_embedding);
 | 
			
		||||
                            let (_, _, writer, dimensions) =
 | 
			
		||||
                                arroy_writers.get(&embedder_id).expect("requested a missing embedder");
 | 
			
		||||
                            writer.del_items(wtxn, *dimensions, docid)?;
 | 
			
		||||
                            writer.add_item(wtxn, docid, embedding)?;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
                // Every time the is a message in the channel we search
 | 
			
		||||
                // for new entries in the BBQueue buffers.
 | 
			
		||||
                write_from_bbqueue(
 | 
			
		||||
                    &mut writer_receiver,
 | 
			
		||||
                    index,
 | 
			
		||||
                    wtxn,
 | 
			
		||||
                    &arroy_writers,
 | 
			
		||||
                    &mut aligned_embedding,
 | 
			
		||||
                )?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        todo!("read the BBQueue once the channel is closed");
 | 
			
		||||
            // Once the extractor/writer channel is closed
 | 
			
		||||
            // we must process the remaining BBQueue messages.
 | 
			
		||||
            write_from_bbqueue(
 | 
			
		||||
                &mut writer_receiver,
 | 
			
		||||
                index,
 | 
			
		||||
                wtxn,
 | 
			
		||||
                &arroy_writers,
 | 
			
		||||
                &mut aligned_embedding,
 | 
			
		||||
            )?;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        'vectors: {
 | 
			
		||||
            let span =
 | 
			
		||||
@@ -548,6 +514,70 @@ where
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/// A function dedicated to manage all the available BBQueue frames.
 | 
			
		||||
///
 | 
			
		||||
/// It reads all the available frames, do the corresponding database operations
 | 
			
		||||
/// and stops when no frame are available.
 | 
			
		||||
fn write_from_bbqueue(
 | 
			
		||||
    writer_receiver: &mut WriterBbqueueReceiver<'_>,
 | 
			
		||||
    index: &Index,
 | 
			
		||||
    wtxn: &mut RwTxn<'_>,
 | 
			
		||||
    arroy_writers: &HashMap<u8, (&str, &crate::vector::Embedder, ArroyWrapper, usize)>,
 | 
			
		||||
    aligned_embedding: &mut Vec<f32>,
 | 
			
		||||
) -> crate::Result<()> {
 | 
			
		||||
    while let Some(frame_with_header) = writer_receiver.read() {
 | 
			
		||||
        match frame_with_header.header() {
 | 
			
		||||
            EntryHeader::DbOperation(operation) => {
 | 
			
		||||
                let database_name = operation.database.database_name();
 | 
			
		||||
                let database = operation.database.database(index);
 | 
			
		||||
                let frame = frame_with_header.frame();
 | 
			
		||||
                match operation.key_value(frame) {
 | 
			
		||||
                    (key, Some(value)) => {
 | 
			
		||||
                        if let Err(error) = database.put(wtxn, key, value) {
 | 
			
		||||
                            return Err(Error::InternalError(InternalError::StorePut {
 | 
			
		||||
                                database_name,
 | 
			
		||||
                                key: key.into(),
 | 
			
		||||
                                value_length: value.len(),
 | 
			
		||||
                                error,
 | 
			
		||||
                            }));
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                    (key, None) => match database.delete(wtxn, key) {
 | 
			
		||||
                        Ok(false) => {
 | 
			
		||||
                            unreachable!("We tried to delete an unknown key: {key:?}")
 | 
			
		||||
                        }
 | 
			
		||||
                        Ok(_) => (),
 | 
			
		||||
                        Err(error) => {
 | 
			
		||||
                            return Err(Error::InternalError(InternalError::StoreDeletion {
 | 
			
		||||
                                database_name,
 | 
			
		||||
                                key: key.into(),
 | 
			
		||||
                                error,
 | 
			
		||||
                            }));
 | 
			
		||||
                        }
 | 
			
		||||
                    },
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
 | 
			
		||||
                for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers {
 | 
			
		||||
                    let dimensions = *dimensions;
 | 
			
		||||
                    writer.del_items(wtxn, dimensions, docid)?;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            EntryHeader::ArroySetVector(asv) => {
 | 
			
		||||
                let ArroySetVector { docid, embedder_id, .. } = asv;
 | 
			
		||||
                let frame = frame_with_header.frame();
 | 
			
		||||
                let embedding = asv.read_embedding_into_vec(frame, aligned_embedding);
 | 
			
		||||
                let (_, _, writer, dimensions) =
 | 
			
		||||
                    arroy_writers.get(&embedder_id).expect("requested a missing embedder");
 | 
			
		||||
                writer.del_items(wtxn, *dimensions, docid)?;
 | 
			
		||||
                writer.add_item(wtxn, docid, embedding)?;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    Ok(())
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")]
 | 
			
		||||
fn compute_prefix_database(
 | 
			
		||||
    index: &Index,
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user