Skip to content

Merge disk space aware take 3 #127828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from

Conversation

albertzaharovits
Copy link
Contributor

No description provided.

Comment on lines +540 to +547
private void dropBudgetOverflow() {
synchronized (mutex) {
for (E element : budgetOverflow) {
budgetOverflowConsumer.accept(element);
}
budgetOverflow.clear();
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as more budget becomes available, re-enqueue all the merge tasks that were over-budget.
It's complicated to re-enqueue only "within" available budget.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable to me though we'll need a combined view of the two queues (the budgetoverflow and the real queue) for autoscaling to trigger heap based autoscaling.

this.executorService = threadPool.executor(ThreadPool.Names.MERGE);
this.maxConcurrentMerges = threadPool.info(ThreadPool.Names.MERGE).getMax();
// the intent here is to throttle down whenever we submit a task and no other task is running
this.concurrentMergesFloorLimitForThrottling = 2;
this.concurrentMergesCeilLimitForThrottling = maxConcurrentMerges * 2;
assert concurrentMergesFloorLimitForThrottling <= concurrentMergesCeilLimitForThrottling;
this.budgetTracker = new BudgetTracker<>(MergeTask::estimatedRemainingMergeSize, Long.MAX_VALUE, queuedMergeTasks::add);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last argument, i.e. queuedMergeTasks::add, is invoked for the over-budget merge tasks (as soon as more budget becomes available).

// it's then the duty of the said merge scheduler to re-enqueue the backlogged merge task when it can be run
}
} catch (BudgetTracker.BudgetOverflowException e) {
// continue to poll for new merge tasks, over-budget merge task will be reenqueued
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

over-budget merge tasks have been "backlogged" until some more budget will become available, at which point all will be re-enqueued.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this direction better, looks good.

synchronized (mutex) {
if (elementBudget > availableBudget) {
budgetOverflow.add(element);
throw new BudgetOverflowException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd prefer a return value, perhaps just null?

Comment on lines +540 to +547
private void dropBudgetOverflow() {
synchronized (mutex) {
for (E element : budgetOverflow) {
budgetOverflowConsumer.accept(element);
}
budgetOverflow.clear();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds reasonable to me though we'll need a combined view of the two queues (the budgetoverflow and the real queue) for autoscaling to trigger heap based autoscaling.

}

void updateBudget(long availableBudget) {
assert availableBudget > 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is == 0 not legal too?

static class BudgetTracker<E> {
private final ToLongFunction<? super E> budgetFunction;
private long availableBudget;
private final IdentityHashMap<E, Long> commitedBudgetPerElement;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
private final IdentityHashMap<E, Long> commitedBudgetPerElement;
private final IdentityHashMap<E, Long> committedBudgetPerElement;

private long availableBudget;
private final IdentityHashMap<E, Long> commitedBudgetPerElement;
private final List<E> budgetOverflow;
private final Consumer<? super E> budgetOverflowConsumer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand "overflow" in this name. Is it the requeueConsumer? Or maybe the moreBudgetAvailableConsumer?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants