Change bucket_sort logic to pass the time budget and allow for retrieving non-blocking buckets

This commit is contained in:
Louis Dureuil
2025-10-15 15:33:45 +02:00
parent 58f30e9d8a
commit b418054ee4

View File

@@ -97,7 +97,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger.start_iteration_ranking_rule(0, ranking_rules[0].as_ref(), query, universe);
ranking_rules[0].start_iteration(ctx, logger, universe, query)?;
ranking_rules[0].start_iteration(ctx, logger, universe, query, &time_budget)?;
let mut ranking_rule_scores: Vec<ScoreDetails> = vec![];
@@ -168,42 +168,6 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
};
while valid_docids.len() < max_len_to_evaluate {
if time_budget.exceeded() {
loop {
let bucket = std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score = ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
// The universe for this bucket is zero, so we don't need to sort
// anything, just go back to the parent ranking rule.
if ranking_rule_universes[cur_ranking_rule_index].is_empty()
@@ -216,14 +180,63 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
continue;
}
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)?
else {
back!();
continue;
let next_bucket = if time_budget.exceeded() {
match ranking_rules[cur_ranking_rule_index].non_blocking_next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
)? {
std::task::Poll::Ready(bucket) => bucket,
std::task::Poll::Pending => {
loop {
let bucket =
std::mem::take(&mut ranking_rule_universes[cur_ranking_rule_index]);
ranking_rule_scores.push(ScoreDetails::Skipped);
// remove candidates from the universe without adding them to result if their score is below the threshold
let is_below_threshold =
ranking_score_threshold.is_some_and(|ranking_score_threshold| {
let current_score =
ScoreDetails::global_score(ranking_rule_scores.iter());
current_score < ranking_score_threshold
});
if is_below_threshold {
all_candidates -= &bucket;
all_candidates -= &ranking_rule_universes[cur_ranking_rule_index];
} else {
maybe_add_to_results!(bucket);
}
ranking_rule_scores.pop();
if cur_ranking_rule_index == 0 {
break;
}
back!();
}
return Ok(BucketSortOutput {
scores: valid_scores,
docids: valid_docids,
all_candidates,
degraded: true,
});
}
}
} else {
let Some(next_bucket) = ranking_rules[cur_ranking_rule_index].next_bucket(
ctx,
logger,
&ranking_rule_universes[cur_ranking_rule_index],
&time_budget,
)?
else {
back!();
continue;
};
next_bucket
};
ranking_rule_scores.push(next_bucket.score);
@@ -275,6 +288,7 @@ pub fn bucket_sort<'ctx, Q: RankingRuleQueryTrait>(
logger,
&next_bucket.candidates,
&next_bucket.query,
&time_budget,
)?;
}