Skip to content

Merge disk space aware take 3 #127828

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 31 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
60fc8b8
watermark settings
albertzaharovits Apr 15, 2025
8718784
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits Apr 24, 2025
253daa1
Check watermark limits only when the thread pool merge scheduler is e…
albertzaharovits Apr 24, 2025
365ae2d
initial merge disk space monitor
albertzaharovits Apr 29, 2025
0e6f7e2
move settings to MergeDiskSpaceMonitor
albertzaharovits Apr 29, 2025
27476ca
remove unused import
albertzaharovits Apr 29, 2025
b9b26fa
Nits
albertzaharovits Apr 29, 2025
1ddd3d3
Thrashy in-between
albertzaharovits May 1, 2025
94d27f9
DiskSpaceMonitor
albertzaharovits May 1, 2025
023f042
PriorityBlockingQueueWithMaxLimit
albertzaharovits May 1, 2025
ad6cf4c
Trimming code
albertzaharovits May 1, 2025
e507684
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 1, 2025
6805fa9
[CI] Auto commit changes from spotless
elasticsearchmachine May 1, 2025
33a0246
Fix compilation issue
albertzaharovits May 1, 2025
4c012cc
Fix TPMST
albertzaharovits May 1, 2025
6a6a759
Some test fixes
albertzaharovits May 2, 2025
2f2720d
[CI] Auto commit changes from spotless
elasticsearchmachine May 2, 2025
4f8a203
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 5, 2025
edcf6bf
Make settings follow the allocation ones
albertzaharovits May 5, 2025
fbc17ec
PriorityBlockingQueueWithBudget
albertzaharovits May 6, 2025
258b7ed
Merge branch 'main' into merge-disk-space-aware-take-2
albertzaharovits May 6, 2025
bfad0a0
ElementWithReleasableBudget
albertzaharovits May 6, 2025
56df5f7
DiskSpaceMonitor
albertzaharovits May 6, 2025
ab53108
Checkstyle
albertzaharovits May 6, 2025
af1b2ff
Nit
albertzaharovits May 7, 2025
cedf89f
Nits
albertzaharovits May 7, 2025
b1c2d7a
Almost
albertzaharovits May 7, 2025
6c29d87
Looks Ok?
albertzaharovits May 7, 2025
859fe9b
[CI] Auto commit changes from spotless
elasticsearchmachine May 7, 2025
da0403b
fillInStackTrace
albertzaharovits May 7, 2025
81cc2b9
[CI] Auto commit changes from spotless
elasticsearchmachine May 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.engine.ThreadPoolMergeExecutorService;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler;
import org.elasticsearch.index.shard.IndexingStatsSettings;
import org.elasticsearch.indices.IndexingMemoryController;
Expand Down Expand Up @@ -628,6 +629,9 @@ public void apply(Settings value, Settings current, Settings previous) {
MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT_SETTING,
MergePolicyConfig.DEFAULT_MAX_TIME_BASED_MERGED_SEGMENT_SETTING,
ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_WATERMARK_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_HIGH_MAX_HEADROOM_SETTING,
ThreadPoolMergeExecutorService.INDICES_MERGE_DISK_CHECK_INTERVAL_SETTING,
TransportService.ENABLE_STACK_OVERFLOW_AVOIDANCE,
DataStreamGlobalRetentionSettings.DATA_STREAMS_DEFAULT_RETENTION_SETTING,
DataStreamGlobalRetentionSettings.DATA_STREAMS_MAX_RETENTION_SETTING,
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ long estimatedMergeSize() {
return onGoingMerge.getMerge().getStoreMergeInfo().estimatedMergeBytes();
}

long estimatedRemainingMergeSize() {
return Math.max(0L, estimatedMergeSize() - rateLimiter.getTotalBytesWritten());
}

public long getMergeMemoryEstimateBytes() {
return mergeMemoryEstimateBytes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,6 @@ protected void doStart() {
IndicesService(IndicesServiceBuilder builder) {
this.settings = builder.settings;
this.threadPool = builder.threadPool;
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings
);
this.pluginsService = builder.pluginsService;
this.nodeEnv = builder.nodeEnv;
this.parserConfig = XContentParserConfiguration.EMPTY.withDeprecationHandler(LoggingDeprecationHandler.INSTANCE)
Expand All @@ -319,6 +315,12 @@ protected void doStart() {
this.bigArrays = builder.bigArrays;
this.scriptService = builder.scriptService;
this.clusterService = builder.clusterService;
this.threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings,
clusterService.getClusterSettings(),
nodeEnv
);
this.projectResolver = builder.projectResolver;
this.client = builder.client;
this.idFieldDataEnabled = INDICES_ID_FIELD_DATA_ENABLED_SETTING.get(clusterService.getSettings());
Expand Down Expand Up @@ -366,7 +368,8 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon
indicesFieldDataCache,
cacheCleaner,
indicesRequestCache,
indicesQueryCache
indicesQueryCache,
threadPoolMergeExecutorService
);
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,17 @@ public void setUp() throws Exception {
emptyMap()
);
threadPool = new TestThreadPool("test");
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(threadPool, settings);
circuitBreakerService = new NoneCircuitBreakerService();
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap(), () -> 1L);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
nodeEnvironment = new NodeEnvironment(settings, environment);
threadPoolMergeExecutorService = ThreadPoolMergeExecutorService.maybeCreateThreadPoolMergeExecutorService(
threadPool,
settings,
nodeEnvironment
);
mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
indexNameExpressionResolver = TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.MergeTask;
import org.elasticsearch.index.engine.ThreadPoolMergeScheduler.Schedule;
import org.elasticsearch.index.merge.OnGoingMerge;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
Expand Down Expand Up @@ -56,9 +58,13 @@

public class ThreadPoolMergeExecutorServiceTests extends ESTestCase {

public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() {
TestThreadPool testThreadPool = new TestThreadPool("test");
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
public void testNewMergeTaskIsAbortedWhenThreadPoolIsShutdown() throws IOException {
TestThreadPool testThreadPool = new TestThreadPool("test", Settings.EMPTY);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
// shutdown the thread pool
testThreadPool.shutdown();
MergeTask mergeTask = mock(MergeTask.class);
Expand All @@ -80,7 +86,11 @@ public void testEnqueuedAndBackloggedMergesAreStillExecutedWhenThreadPoolIsShutd
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
TestThreadPool testThreadPool = new TestThreadPool("test", settings);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
var countingListener = new CountingMergeEventListener();
threadPoolMergeExecutorService.registerMergeEventListener(countingListener);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Expand Down Expand Up @@ -191,7 +201,11 @@ public void testTargetIORateChangesWhenSubmittingMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
Semaphore runMergeSemaphore = new Semaphore(0);
AtomicInteger submittedIOThrottledMergeTasks = new AtomicInteger();
Expand Down Expand Up @@ -271,7 +285,11 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Semaphore runMergeSemaphore = new Semaphore(0);
Expand Down Expand Up @@ -333,38 +351,39 @@ public void testIORateIsAdjustedForRunningMergeTasks() throws Exception {
}
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSpeedy() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(0, 250);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsSluggish() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(750, 1000);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() {
public void testIORateAdjustedForSubmittedTasksWhenExecutionRateIsOnPar() throws IOException {
// the executor runs merge tasks at a faster rate than the rate that merge tasks are submitted
int submittedVsExecutedRateOutOf1000 = randomIntBetween(250, 750);
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(0, 5));
// executor starts running merges only after a considerable amount of merge tasks have already been submitted
testIORateAdjustedForSubmittedTasks(randomIntBetween(50, 1000), submittedVsExecutedRateOutOf1000, randomIntBetween(5, 50));
}

private void testIORateAdjustedForSubmittedTasks(
int totalTasksToSubmit,
int submittedVsExecutedRateOutOf1000,
int initialTasksToSubmit
) {
private void testIORateAdjustedForSubmittedTasks(int totalTasksToSubmit, int submittedVsExecutedRateOutOf1000, int initialTasksToSubmit)
throws IOException {
DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue();
ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
mergeExecutorThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
final AtomicInteger currentlySubmittedMergeTaskCount = new AtomicInteger();
final AtomicLong targetIORateLimit = new AtomicLong(ThreadPoolMergeExecutorService.START_IO_RATE.getBytes());
final AtomicReference<MergeTask> lastRunTask = new AtomicReference<>();
Expand Down Expand Up @@ -424,7 +443,11 @@ public void testMergeTasksRunConcurrently() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
// more merge tasks than max concurrent merges allowed to run concurrently
int totalMergeTasksCount = mergeExecutorThreadCount + randomIntBetween(1, 5);
Expand Down Expand Up @@ -465,7 +488,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(mergeExecutorThreadCount));
// with the other merge tasks enqueued
assertThat(
threadPoolMergeExecutorService.getQueuedMergeTasks().size(),
threadPoolMergeExecutorService.getMergeTasksQueueLength(),
is(totalMergeTasksCount - mergeExecutorThreadCount - finalCompletedTasksCount)
);
// also check thread-pool stats for the same
Expand All @@ -485,7 +508,7 @@ public void testMergeTasksRunConcurrently() throws Exception {
// there are fewer available merges than available threads
assertThat(threadPoolMergeExecutorService.getRunningMergeTasks().size(), is(finalRemainingMergeTasksCount));
// no more merges enqueued
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
// also check thread-pool stats for the same
assertThat(threadPoolExecutor.getActiveCount(), is(finalRemainingMergeTasksCount));
assertThat(threadPoolExecutor.getQueue().size(), is(0));
Expand All @@ -504,7 +527,11 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
int totalMergeTasksCount = randomIntBetween(1, 10);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) testThreadPool.executor(ThreadPool.Names.MERGE);
Expand Down Expand Up @@ -533,7 +560,7 @@ public void testThreadPoolStatsWithBackloggedMergeTasks() throws Exception {
assertThat(threadPoolExecutor.getActiveCount(), is(backloggedMergeTasksList.size()));
assertThat(threadPoolExecutor.getQueue().size(), is(0));
}
assertThat(threadPoolMergeExecutorService.getQueuedMergeTasks().size(), is(0));
assertThat(threadPoolMergeExecutorService.getMergeTasksQueueLength(), is(0));
});
// re-enqueue backlogged merge tasks
for (MergeTask backloggedMergeTask : backloggedMergeTasksList) {
Expand All @@ -557,7 +584,11 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
.put(EsExecutors.NODE_PROCESSORS_SETTING.getKey(), mergeExecutorThreadCount)
.build();
try (TestThreadPool testThreadPool = new TestThreadPool("test", settings)) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(testThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
testThreadPool,
settings,
newNodeEnvironment(settings)
);
assertThat(threadPoolMergeExecutorService.getMaxConcurrentMerges(), equalTo(mergeExecutorThreadCount));
// many merge tasks concurrently
int mergeTaskCount = randomIntBetween(10, 100);
Expand Down Expand Up @@ -613,10 +644,14 @@ public void testBackloggedMergeTasksExecuteExactlyOnce() throws Exception {
}
}

public void testMergeTasksExecuteInSizeOrder() {
public void testMergeTasksExecuteInSizeOrder() throws IOException {
DeterministicTaskQueue mergeExecutorTaskQueue = new DeterministicTaskQueue();
ThreadPool mergeExecutorThreadPool = mergeExecutorTaskQueue.getThreadPool();
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(mergeExecutorThreadPool);
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = getThreadPoolMergeExecutorService(
mergeExecutorThreadPool,
Settings.EMPTY,
newNodeEnvironment(Settings.EMPTY)
);
DeterministicTaskQueue reEnqueueBackloggedTaskQueue = new DeterministicTaskQueue();
int mergeTaskCount = randomIntBetween(10, 100);
// sort merge tasks available to run by size
Expand Down Expand Up @@ -696,13 +731,21 @@ public void onMergeAborted(OnGoingMerge merge) {
}
}

static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(ThreadPool threadPool) {
static ThreadPoolMergeExecutorService getThreadPoolMergeExecutorService(
ThreadPool threadPool,
Settings settings,
NodeEnvironment nodeEnvironment
) {
ThreadPoolMergeExecutorService threadPoolMergeExecutorService = ThreadPoolMergeExecutorService
.maybeCreateThreadPoolMergeExecutorService(
threadPool,
randomBoolean()
? Settings.EMPTY
: Settings.builder().put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true).build()
? settings
: Settings.builder()
.put(settings)
.put(ThreadPoolMergeScheduler.USE_THREAD_POOL_MERGE_SCHEDULER_SETTING.getKey(), true)
.build(),
nodeEnvironment
);
assertNotNull(threadPoolMergeExecutorService);
assertTrue(threadPoolMergeExecutorService.allDone());
Expand Down
Loading