Merge branch nadav/feature-split-merges into main#6461
Conversation
Make MergeSchedulerService optional
* Spawn merge pipeline from new compactor * new line because adrien is annoyed about it * comment
* Wire up compactor service * lints
* Implement compactor pipeline update logic * lints and fixes * comments, lints, test fixes, other things
Refactor migrations
…xplicitly enabled (#6453) * Revert to using existing merge flow when standalone compactors isnt explicitly enabled * Pull out some configs
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e60210f3d1
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Preserve indexer standalone-compactor flag from config
build_and_validate unconditionally overwrites indexer.enable_standalone_compactors from a separate top-level ConfigValue, whose default is false; this means setting indexer.enable_standalone_compactors: true in the indexer section is silently ignored unless QW_ENABLE_STANDALONE_COMPACTORS (or the top-level field) is also set. In practice, operators enabling standalone compaction through the documented indexer config will still run in legacy mode and may never start external compactor flows.
Useful? React with 👍 / 👎.
| .wait_for(COMPACTION_SERVICE_DISCOVERY_TIMEOUT, |connections| { | ||
| !connections.is_empty() | ||
| }) |
There was a problem hiding this comment.
Validate planner endpoint, not just janitor presence
On compactor-only nodes, discovery succeeds as soon as any janitor connection exists, but this does not guarantee that the janitor exposes the compaction planner RPC (for example, janitor nodes running with standalone compactors disabled). In that case startup proceeds with a client to a node that cannot serve report_status, causing repeated runtime failures instead of a clear startup error.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: ce469888b6
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let indexing_split_cache: Arc<IndexingSplitCache> = if node_config | ||
| .is_service_enabled(QuickwitService::Indexer) | ||
| && node_config.is_service_enabled(QuickwitService::Compactor) |
There was a problem hiding this comment.
Enable split cache when indexer runs without compactor
This condition now falls back to IndexingSplitCache::no_caching() unless both indexer and compactor services are enabled, which disables the indexer split cache in common indexer-only deployments. Before this change, IndexingService::new always opened the cache from indexer_config; with the new wiring, legacy merge pipelines on indexer-only nodes repeatedly re-download splits instead of reusing local cache, causing a significant performance and bandwidth regression.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7054e1d339
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| if !has_healthy { | ||
| self.record_pipeline_duration(); | ||
| info!(task_id=%self.task_id, "all compaction pipeline actors completed"); | ||
| self.status = PipelineStatus::Completed; | ||
| self.record_terminal_metrics(true); |
There was a problem hiding this comment.
Mark one-shot compaction pipelines without actor termination
This completion test only succeeds when every child actor reports Health::Success, but the downloader/executor/packager/uploader/publisher actors spawned for a compaction task are the normal long-lived actors and do not exit after processing a single merge message. After a successful publish they therefore stay Healthy until the progress heartbeat check eventually marks idle actors unhealthy, so the task is never reported as Completed (and may be reported failed), leaking compactor slots and preventing the planner from receiving successful acknowledgements.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
https://github.com/quickwit-oss/quickwit/blob/a5b1b012c6c24b0d055c40c7f6bee2f2ab2c8d7f/quickwit-compaction/src/planner/compaction_state.rs#L143-L145
Drop splits once they age out of compaction
When a tracked split reaches its maturity timestamp before it can form a merge operation (for example, a low-volume partition that never reaches the merge factor), merge_policy.operations(splits) leaves that mature split in splits, so this branch keeps the partition and its ID in needs_compaction_split_ids forever. Those stale IDs continue to be counted/excluded on every scan and can eventually consume the MAX_EXCLUDED_SPLIT_IDS budget, causing later immature splits to be hidden behind already-tracked rows instead of being discovered.
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
954d2d8 to
82fa2be
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 82fa2bef62
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| for split_id in &task.split_ids { | ||
| self.in_flight_split_ids.insert(split_id.clone()); | ||
| self.needs_compaction_split_ids.remove(split_id.as_str()); |
There was a problem hiding this comment.
Remove adopted heartbeat splits from pending vectors
When the planner has already scanned a split into needs_compaction and later receives an in-progress heartbeat for an unknown task (the recovery case described here), this only removes the split ID from needs_compaction_split_ids; the SplitMetadata remains in the needs_compaction vector. plan_partition operates on that vector, so the next planning tick can enqueue a second merge for the same split while the worker is still reporting it in progress. Remove the matching metadata from needs_compaction as well, or make planning skip IDs that are already in flight.
Useful? React with 👍 / 👎.
| if query | ||
| .excluded_split_ids | ||
| .iter() | ||
| .any(|excluded| excluded == &split.split_metadata.split_id) | ||
| { |
There was a problem hiding this comment.
Apply exclusions before immature split early returns
For compaction planner scans against the file-backed metastore, queries combine retain_immature with excluded_split_ids, but split_query_predicate returns from the maturity check above before reaching this new exclusion block. Once the first SCAN_PAGE_SIZE immature splits are already tracked, list_splits keeps returning that same excluded page, ingest_splits skips it, and the planner never advances to later splits. Move this exclusion ahead of the maturity return path or avoid returning early there.
Useful? React with 👍 / 👎.
| let Some(merge_scheduler_service) = self.merge_scheduler_service_opt.clone() else { | ||
| return Ok(None); |
There was a problem hiding this comment.
Keep Parquet merge pipelines until compactors support them
When enable_standalone_compactors disables the local merge scheduler, this also returns None for metrics/Parquet indexes, so their publishers no longer feed a ParquetMergePlanner. The new standalone compactor only plans MergeOperations from the regular splits table and does not process metrics_splits/ParquetMergeTasks, so metrics indexes in standalone mode will accumulate unmerged Parquet splits indefinitely. Keep the Parquet merge pipeline active or add equivalent standalone Parquet compaction before dropping this mailbox.
Useful? React with 👍 / 👎.
| // `MergePolicy::operations` emits at most one op per level per call, which under a backlog | ||
| // leaves the bulk of `splits` untouched per tick. Loop until no new operations are created. | ||
| loop { | ||
| let operations = merge_policy.operations(splits); |
There was a problem hiding this comment.
Run finalize merge policy in standalone compaction
With standalone compactors enabled, indexers no longer spawn local merge pipelines, so source shutdown no longer sends RunFinalizeMergePolicyAndQuit; this planner path only calls operations(). For finite or cold sources that rely on max_finalize_merge_operations, the final merge policy is never applied and straggler splits below the normal merge factor remain unmerged. Add a finalize path in the standalone planner or keep local finalization for those sources.
Useful? React with 👍 / 👎.
| ingester_opt: Option<Ingester>, | ||
|
|
||
| pub compaction_service_client_opt: Option<CompactionPlannerServiceClient>, | ||
| pub _compactor_supervisor_opt: Option<Mailbox<CompactorSupervisor>>, |
There was a problem hiding this comment.
Include compactor supervisor in liveness checks
This stores the compactor supervisor separately, but the health routes still only receive the indexer and janitor mailboxes, and CompactorSupervisor has no Healthz wiring. On compactor-only nodes, if the supervisor actor fails, /health/livez continues to return healthy because no enabled service is checked, leaving orchestration unaware that compaction has stopped. Wire the supervisor into the liveness path like the indexer/janitor services.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Yes, if the merge planner goes down, we're fucked. We need health checks on the planner and executor sides.
| let planner = CompactionPlanner::new(metastore_client.clone()); | ||
| let (mailbox, handle) = universe.spawn_builder().spawn(planner); |
There was a problem hiding this comment.
Elect a single compaction planner per cluster
In clusters with more than one janitor, this starts an independent in-memory CompactionPlanner on every janitor, while compactor-only nodes use a balanced Janitor channel for report_status. A task can be assigned by one planner and reported to another, and each planner scans the same Published immature splits without shared state, so duplicate compactions and spurious timeouts/reassignments are likely in the default multi-janitor topology. Use a single elected planner or persist/shard assignment state before enabling multiple planners.
Useful? React with 👍 / 👎.
| let mut doc_mappers = HashMap::new(); | ||
| doc_mappers.insert(*doc_mapping_uid, doc_mapper); |
There was a problem hiding this comment.
Do not alias current doc mapper to old split UIDs
When an index has old immature splits from a previous doc mapping, get_for_split passes that old doc_mapping_uid here, but this code fetches only the current index metadata and stores the current doc mapper under the old UID. The resulting assignment serializes self.config.doc_mapping, so standalone compactors can merge old-schema splits with the new schema after a mapping update. Verify that the fetched config's UID matches the split UID, or retain/fetch the historical doc mapping for that UID.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3220d844d2
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let operations = merge_policy.operations(splits); | ||
| if operations.is_empty() { | ||
| break; |
There was a problem hiding this comment.
Drop tracked splits once they become mature
For sparse sources or partitions that never reach the merge factor, a split can age past its maturation period while sitting in needs_compaction. The merge policies skip mature splits by leaving them in splits, so operations() returns empty here and the later is_empty() cleanup never removes their IDs from needs_compaction_split_ids; those stale IDs are then excluded on every metastore scan and can grow unbounded on long-running planners. Re-evaluate and remove now-mature splits from the tracked set before breaking out.
Useful? React with 👍 / 👎.
| -- The partial predicate is restricted to split_state = 'Published' because | ||
| -- partial-index predicates must be IMMUTABLE; "now()" cannot appear here. |
There was a problem hiding this comment.
i'm not entirely convinced by the use of a partial index here. Moving from Staged->Published and Published->MarkedForDeletion both need to write to this index, and Published should be the majority of splits at any given point in time, i don't think having a partial index here gains us much
There was a problem hiding this comment.
I agree. I've tried this before and it didn't yield any gains.
* Decommission compactors gracefully * Decommission ingesters and compactors together; Rename enum
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0391a21327
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| let compaction_dir = node_config.data_dir_path.join("compaction"); | ||
| fs::create_dir_all(&compaction_dir)?; | ||
| let compaction_root_directory = quickwit_common::temp_dir::Builder::default() | ||
| .tempdir_in(&compaction_dir) |
There was a problem hiding this comment.
Purge stale compaction scratch directories
On compactor nodes this creates a fresh temp root under <data_dir>/compaction but never purges the parent on startup. If the process is killed while a merge is downloading or packaging splits, TempDirectory cleanup does not run, and the next start leaves the old child directories in place while creating a new one; large compaction scratch data can accumulate until the disk fills. Use the existing purge-on-startup pattern for scratch roots or otherwise remove stale children before tempdir_in.
Useful? React with 👍 / 👎.
| if self.all_indexers_migrated().await { | ||
| info!( | ||
| "all indexers report standalone compactors enabled, starting compaction scan loop" | ||
| ); | ||
| ctx.send_self_message(ScanAndPlan).await?; |
There was a problem hiding this comment.
Recheck indexer migration before scans
This one-time transition starts the ScanAndPlan loop permanently; the scan handler then reschedules itself without calling all_indexers_migrated again. If a janitor starts when no non-migrated indexers are live, then a legacy or standalone_compactors=false indexer joins later during a rolling restart or partition recovery, that indexer still runs local merge pipelines while the standalone planner keeps scanning and can compact the same Published splits concurrently. Recheck the cluster state before each scan or react to membership changes.
Useful? React with 👍 / 👎.
| Err(error) => { | ||
| error!(%error, "failed to report status to compaction planner"); | ||
| } |
There was a problem hiding this comment.
Decommission idle compactors without planner ack
When the planner RPC fails during shutdown, this branch only logs and never calls check_decommissioning_status; even an idle compactor with no in-flight pipelines remains Decommissioning, so wait_for_compactor_decommission blocks until the 300s timeout if the janitor/planner has already gone away. Check the local drain condition on errors or in the Decommission handler before requiring a planner response.
Useful? React with 👍 / 👎.
| self.needs_compaction_split_ids | ||
| .iter() | ||
| .chain(self.in_flight_split_ids.iter()) | ||
| .take(max_size) | ||
| .cloned(), |
There was a problem hiding this comment.
Avoid starving scans after the exclusion cap
When the planner tracks more than MAX_EXCLUDED_SPLIT_IDS splits, this truncates the exclusion list while scan_metastore still fetches only the first maturity-sorted page and ingest_splits skips already-tracked IDs. If the earliest page is filled with tracked splits that fell past this cap, every scan can return only work the planner discards, so later untracked immature splits are never discovered until the tracked set shrinks. Use pagination/cursors or otherwise ensure tracked IDs beyond the cap cannot occupy the whole page.
Useful? React with 👍 / 👎.
| if let Err(error) = self.spawn_task(assignment, spawn_ctx).await { | ||
| error!(%task_id, %error, "failed to spawn compaction task"); | ||
| } |
There was a problem hiding this comment.
Report assignments that fail to spawn
If spawn_task fails before inserting a pipeline, for example because the assignment JSON cannot be decoded or the storage URI cannot be resolved, the planner has already recorded that task as assigned before returning it, but this branch only logs and forgets it. Subsequent status reports contain neither in_progress nor failure for that task, leaving its splits in the planner's in-flight set until the heartbeat timeout instead of rescheduling immediately. Send a failure status for these spawn errors or reject the assignment before the planner records it.
Useful? React with 👍 / 👎.
|
|
||
| #[derive(Debug)] | ||
| struct InFlightCompaction { | ||
| split_ids: Vec<SplitId>, |
There was a problem hiding this comment.
| split_ids: Vec<SplitId>, | |
| split_ids: HashSet<SplitId>, |
?
| pub fn plan_partition( | ||
| &mut self, | ||
| partition_key: &CompactionPartitionKey, | ||
| merge_policy: &Arc<dyn MergePolicy>, |
There was a problem hiding this comment.
Arc<dyn MergePolicy> or &dyn MergePolicy
| pub task_id: TaskId, | ||
| pub index_uid: IndexUid, | ||
| pub source_id: SourceId, | ||
| pub split_ids: Vec<SplitId>, |
There was a problem hiding this comment.
HashSet? (I prefer hashset whenever possible to convey no duplicates, no ordering)
| metastore: MetastoreServiceClient, | ||
| split_store: IndexingSplitStore, | ||
| io_throughput_limiter: Option<Limiter>, | ||
| max_concurrent_split_uploads: usize, |
| @@ -0,0 +1,53 @@ | |||
| // Copyright 2021-Present Datadog, Inc. | |||
There was a problem hiding this comment.
I'm afraid the cardinality of metrics might be a problem. We should look into using the env var QW_DISABLE_PER_INDEX_METRICS
| /// so during a rolling upgrade an older caller serializes it without this key and an upgraded | ||
| /// metastore must still deserialize the query (an absent list means "no exclusion"). | ||
| #[serde(default)] | ||
| pub excluded_split_ids: Vec<SplitId>, |
There was a problem hiding this comment.
| pub excluded_split_ids: Vec<SplitId>, | |
| pub excluded_split_ids: HashSet<SplitId>, |
?
| /// migrations 1..N-1 applied and committed in `_sqlx_migrations`. The | ||
| /// operator fixes the failing migration and re-runs. | ||
| #[instrument(skip_all)] | ||
| pub(super) async fn run_migrations( |
There was a problem hiding this comment.
I think we need to spend more time thinking about this. We want to remove the transaction because we do want to run migrations queries that don't lock. However, if a migration fail, we're also left in a bad state.
| /// Uniquely identifies a merge pipeline. There exists at most one merge pipeline per | ||
| /// `(index_uid, source_id)` running on indexer at any given time fed by one or more indexing | ||
| /// pipelines. | ||
| /// TODO: Rework/remove this as part of splitting up merges. |
| ingester_opt: Option<Ingester>, | ||
|
|
||
| pub compaction_service_client_opt: Option<CompactionPlannerServiceClient>, | ||
| pub _compactor_supervisor_opt: Option<Mailbox<CompactorSupervisor>>, |
There was a problem hiding this comment.
Yes, if the merge planner goes down, we're fucked. We need health checks on the planner and executor sides.
| /// | ||
| /// The search acts receives reports of splits. | ||
| pub struct SplitCache { | ||
| pub struct SearchSplitCache { |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 95ee771fae
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Sequence)] | ||
| #[serde(into = "&'static str")] | ||
| pub enum QuickwitService { | ||
| Compactor, |
There was a problem hiding this comment.
Initialize runtimes for compactor-only nodes
Adding Compactor as a standalone service makes quickwit run --service compactor a valid deployment shape, but the CLI runtime bootstrap in quickwit-cli/src/lib.rs still initializes actor runtimes only for indexer, janitor, or control-plane services. In a compactor-only node with enable_standalone_compactors=true, serve_quickwit starts CompactorSupervisor and merge pipeline actors without the blocking/actor runtimes they rely on for merge execution, so dedicated compactor nodes can fail once they receive work. Include QuickwitService::Compactor in that runtime initialization path.
Useful? React with 👍 / 👎.
| if node_config.is_service_enabled(QuickwitService::Compactor) | ||
| && !node_config.enable_standalone_compactors | ||
| { |
There was a problem hiding this comment.
Revalidate CLI service overrides for compactor
This guard only runs while loading the config, but RunCliCommand::execute applies --service overrides to node_config.enabled_services after validation. With quickwit run --service compactor and no enable_standalone_compactors, the new validation is bypassed: the node advertises the compactor service in gossip, while serve_quickwit skips start_compactor_service because the flag is false, leaving a seemingly healthy node that never runs or reports compactions. Re-run this validation after CLI overrides or reject compactor in the override path.
Useful? React with 👍 / 👎.
| let max_concurrent_split_uploads_index = if self.merge_scheduler_service_opt.is_some() { | ||
| (self.max_concurrent_split_uploads / 2).max(1) | ||
| } else { | ||
| self.max_concurrent_split_uploads |
There was a problem hiding this comment.
Preserve a nonzero index upload budget
When standalone compactors are enabled, merge_scheduler_service_opt is None, so this branch forwards indexer.max_concurrent_split_uploads unchanged to the index uploader. Because that config is still a usize and accepts 0, a node with indexer.max_concurrent_split_uploads: 0 now initializes the index upload semaphore with zero permits and the first split upload waits forever; before this new no-merge-scheduler path the index side was clamped with .max(1). Clamp this branch or reject zero for the indexer config as well.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Skip the ingester wait after notify failures
If the initial decommission RPC fails, this still enters wait_for_ingester_decommission because the join is unconditional. In a shutdown where the ingester remains reachable for observation but did not accept Decommission (for example a transient or load-shed error on the decommission call), the status stays Ready and node shutdown waits the full 300s before quitting; before this split-notify refactor, the combined helper returned as soon as the decommission request failed. Only wait for the ingester when notify_ingester_decommission succeeded.
Useful? React with 👍 / 👎.

Description
Merges the split compaction feature branch into main.
Split compaction is gated by an env var and a config flag on the indexer config. It's non-intrusive: when it's off, everything remains as is- indexer nodes spawn merge pipelines and merge their own splits.
Standalone compactors (mergers) is experimental for now, and opt-in. It's configurable and there will be additional changes to it as usage of it increases.
How was this PR tested?
Testing on a cluster receiving a moderate amount of traffic. Unit tests and integration tests as needed.