feat(hive-sync): parallelize DROP partitions in HiveQL sync mode by nsivabalan · Pull Request #19033 · apache/hudi · GitHub
Skip to content

feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033

Open
nsivabalan wants to merge 2 commits into
apache:masterfrom
nsivabalan:hiveql-parallelize-drop
Open

feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033
nsivabalan wants to merge 2 commits into
apache:masterfrom
nsivabalan:hiveql-parallelize-drop

Conversation

@nsivabalan

@nsivabalan nsivabalan commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

Describe the issue this Pull Request addresses

Closes #18331 (continued).

The HiveQL sync mode's dropPartitionsToTable walks the partition list one element at a time on the session IMetaStoreClient (Thrift). On tables with thousands of partitions, a single sync that needs to drop a non-trivial subset becomes a multi-minute serial Thrift loop.

The companion PR #18984 introduced parallel ADD/UPDATE/TOUCH for HiveQL via HiveDriverPool. DROP can't reuse that pool because it goes through IMetaStoreClient.dropPartition, not the Hive Driver. This PR adds the analogous Thrift-side fan-out so the opt-in batching flag covers all four partition operations end-to-end.

This PR is built on top of #18984. Once #18984 merges to master, this PR rebases trivially.

Summary and Changelog

When hoodie.datasource.hive_sync.batching.enabled=true and sync mode is HIVEQL, partition drops are split into batches of hoodie.datasource.hive_sync.batch_num and dispatched across a pool of worker threads, each holding its own IMetaStoreClient. Default off; existing behavior is unchanged.

Changes:

  • HiveQueryDDLExecutor.dropPartitionsToTable — previously iterated the partition list sequentially on the session metastore client. Now splits into batches of HIVE_BATCH_SYNC_PARTITION_NUM and either runs them sequentially on the session client (default) or fans them across the pool (when batching is enabled). First-error semantics: first failure thrown, subsequent failures logged at WARN. Each worker iterates its chunk one partition at a time — Hive has no batch-drop primitive matching dropPartition's semantics, so the win comes from running chunks across independent Thrift clients in parallel.

  • HoodieHiveSyncClient — builds a IMetaStoreClientPool for the HIVEQL mode branches (mode=hiveql and the legacy default-mode branch) when batching is enabled. The pool is closed in close() before Hive.closeCurrent() so the RetryingMetaStoreClient instances release their Thrift sockets without racing the ThreadLocal Hive cleanup. Falls back to single-client sequential drop when hoodie.datasource.hive_sync.use_spark_catalog=true (the Spark catalog client is constructed reflectively and isn't compatible with the direct RetryingMetaStoreClient pool path).

  • util/IMetaStoreClientPool — new, ~207 lines. Bounded blocking pool of RetryingMetaStoreClient instances backed by a fixed-size ExecutorService. Workers borrow a client for the duration of the function (pool.run(Function)), and the pool releases it back on the slot's free list on return. Unlike HiveDriverPool (which is constrained by Hive 2.x's thread-bound Driver/SessionState), IMetaStoreClient is a Thrift socket — the pool is a plain ArrayBlockingQueue + ExecutorService, no per-slot dedicated thread required.

Design constraint note: DROP and ADD/UPDATE/TOUCH live on opposite concurrency models in HiveQL mode. ADD/UPDATE/TOUCH go through the Hive Driver (thread-bound, SessionState is ThreadLocal) and use HiveDriverPool. DROP goes through the Thrift client and uses IMetaStoreClientPool. The two pools are constructed only when batching.enabled=true and torn down together in HoodieHiveSyncClient.close().

Impact

User-facing impact is opt-in only.

Risk Level

Medium.

Mitigation:

  • New end-to-end test testHiveQLDropPartitionsWithBatching creates 8 partitions and drops 4 through the parallel pool with threads=3 and batch_num=2 (so multiple drop batches race against each other on independent Thrift clients) and asserts the remaining partition set.
  • Existing tests across all three sync modes (hms / hiveql / jdbc) pass unchanged.
  • With batching.enabled=false, the new runDropBatches helper falls through to a sequential applyDropBatch against the session client — semantically equivalent to the prior loop.

Documentation Update

No new configs; no docsite change required.

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR extends the parallel batching infrastructure to HiveQL DROP partitions by wiring the existing IMetaStoreClientPool into HiveQueryDDLExecutor. The structure is cleanly gated behind the existing opt-in flag, and the test coverage for both the new pool plumbing and the end-to-end DROP path is solid. A couple of correctness concerns around resource cleanup and SessionState sharing are worth a closer look in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of nits on a field that exists purely for documentation and a duplicated futures-error loop.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 If ddlExecutor.close() throws, control jumps to the outer catch (Exception e) { log.error(...) } (which doesn't rethrow), so partitionClientPool.close() never runs and the Thrift sockets + worker threads leak silently. Could you wrap the pool cleanup in a try-finally so it runs regardless of whether ddlExecutor.close() succeeds? E.g. try { ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// SessionState is shared across workers; build it once on the first call (with
// currentDatabase already set) and attach it to each worker's thread-local on
// subsequent calls. The database is a pool-wide property and never changes
// across workers, so setting it once at construction time is sufficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The shared SessionState pattern here makes me a bit nervous — every worker thread calls SessionState.start(sharedSessionState) and then runs concurrent Driver.run() against the same instance. Hive's SessionState has plenty of mutable fields the Driver writes during query execution (e.g. currentDatabase via USE, hiveOperation, cmd, lineageState, tablesAlreadyMarkedAsRead). Are we confident none of these get written concurrently by parallel ALTER PARTITION / TOUCH statements? @yihua could you weigh in on whether sharing SessionState across worker threads is safe in practice, or whether each worker should construct its own (paying the SessionState init cost in exchange for isolation)?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Sharing one SessionState across worker threads is risky here. Hive stores the active SessionState in a ThreadLocal (SessionState.start() sets tss.set(...)), so this pattern installs the same mutable instance into N threads' ThreadLocals. The Hive Driver isn't documented as thread-safe, and Driver execution mutates session-scoped state — currentDatabase (USE), conf overrides, and the compile/execute lineage/read-entity sets — so concurrent ALTER PARTITION/TOUCH runs can interleave on those fields. Worth confirming whether any of the batched statements can issue a USE or set conf; if so, per-worker SessionState (or serializing the session-mutating bits) is the safer default even at the init cost.

for (List<String> batch : batches) {
futures.add(partitionClientPool.executor().submit(() ->
partitionClientPool.run(poolClient -> {
action.apply(poolClient, batch);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 When a Future fails, f.get() throws ExecutionException wrapping the real cause. firstError then stores the ExecutionException, which gets rethrown and caught by the outer catch (Exception e) in addPartitionsToTable/dropPartitionsToTable/registerAlterPartitionEvent, which wraps it again in HoodieHiveSyncException. The result is HoodieHiveSyncException -> ExecutionException -> RealCause — one more wrapping layer than the sequential path produced. HiveDriverPool.awaitAll already has an unwrap() helper; could we do the same here for consistency?

- AI-generated; verify before applying. React 👍/👎 to flag quality.

// Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit
// or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for
// reference only — close() is delegated through ddlExecutor.close().
private Option<HiveDriverPool> partitionDriverPool = Option.empty();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into HiveQueryDDLExecutor. Could you make it a local variable in the constructor instead? A field that isn't read after construction will make future readers wonder what lifecycle role it plays.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

* semantics, so each worker still iterates its chunk one partition at a time — the
* win is fanning chunks across independent Thrift clients.
*/
private void runDropBatches(String tableName, List<List<String>> batches) throws Exception {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the parallel-path error-collection loop inside runDropBatches (Exception firstError = null; for (Future<Void> f : futures) …) is nearly identical to the one in HMSDDLExecutor.runBatches. Since HiveDriverPool already has a well-factored awaitAll(List<Future<?>>) for exactly this pattern, it might be worth adding a similar awaitAll(List<Future<Void>>) to IMetaStoreClientPool so both callers can delegate to it instead.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

@codecov-commenter

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 80.20566% with 77 lines in your changes missing coverage. Please review.
✅ Project coverage is 67.65%. Comparing base (8933224) to head (7022e7d).
⚠️ Report is 51 commits behind head on master.

Files with missing lines Patch % Lines
...java/org/apache/hudi/hive/util/HiveDriverPool.java 82.48% 18 Missing and 6 partials ⚠️
...rg/apache/hudi/hive/util/IMetaStoreClientPool.java 72.05% 16 Missing and 3 partials ⚠️
...org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java 77.61% 9 Missing and 6 partials ⚠️
.../java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java 84.12% 7 Missing and 3 partials ⚠️
...ava/org/apache/hudi/hive/HoodieHiveSyncClient.java 67.85% 8 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #19033      +/-   ##
============================================
- Coverage     68.26%   67.65%   -0.62%     
- Complexity    29513    29865     +352     
============================================
  Files          2542     2564      +22     
  Lines        142627   145490    +2863     
  Branches      17788    18371     +583     
============================================
+ Hits          97369    98428    +1059     
- Misses        37253    38828    +1575     
- Partials       8005     8234     +229     
Flag Coverage Δ
common-and-other-modules 44.87% <80.20%> (+0.08%) ⬆️
hadoop-mr-java-client 44.67% <ø> (-0.08%) ⬇️
spark-client-hadoop-common 48.28% <ø> (+0.24%) ⬆️
spark-java-tests 48.15% <3.08%> (-0.62%) ⬇️
spark-scala-tests 44.37% <7.45%> (-0.48%) ⬇️
utilities 37.09% <4.62%> (-0.16%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ava/org/apache/hudi/hive/HiveSyncConfigHolder.java 99.21% <100.00%> (+0.08%) ⬆️
...rg/apache/hudi/hive/ddl/QueryBasedDDLExecutor.java 87.12% <100.00%> (+1.07%) ⬆️
...ava/org/apache/hudi/hive/HoodieHiveSyncClient.java 50.12% <67.85%> (+1.20%) ⬆️
.../java/org/apache/hudi/hive/ddl/HMSDDLExecutor.java 80.86% <84.12%> (+0.51%) ⬆️
...org/apache/hudi/hive/ddl/HiveQueryDDLExecutor.java 70.40% <77.61%> (+4.72%) ⬆️
...rg/apache/hudi/hive/util/IMetaStoreClientPool.java 72.05% <72.05%> (ø)
...java/org/apache/hudi/hive/util/HiveDriverPool.java 82.48% <82.48%> (ø)

... and 156 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions github-actions Bot added the size:XL PR with lines of changes > 1000 label Jun 22, 2026
nsivabalan and others added 2 commits June 23, 2026 13:25
Adds an opt-in pool that splits HiveQL partition DDL (add/update/touch/drop)
into batches of `hoodie.datasource.hive_sync.batch_num` and dispatches them
in parallel across a pool of single-thread workers. Each worker owns its
own Hive `Driver` + `SessionState` (both thread-bound in Hive 2.x), so the
fan-out is implemented as a fixed pool of dedicated single-thread executors
rather than a shared thread pool.

Table-level operations (create/alter table, last commit time, writer version)
continue to use the single session `Driver`. Partition-phase SQL lists run
through the pool only when `hoodie.datasource.hive_sync.batching.enabled`
is set to true; default off, existing behavior unchanged.

Hive 2.x's `ALTER PARTITION SET LOCATION` ignores db.table qualifiers and
uses the connection's current database, so each worker is primed with the
correct USE statement before any partition ALTER is dispatched.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to the HiveQL partition batching introduced in the prior commit
(PR apache#18984). HiveQueryDDLExecutor.dropPartitionsToTable goes through
IMetaStoreClient.dropPartition (Thrift), not the Hive Driver — so it can't
reuse HiveDriverPool. This change wires an IMetaStoreClientPool into
HiveQueryDDLExecutor and uses it to fan drop batches across the pool's
worker threads.

Behavior:
- batching.enabled=false (default): unchanged. dropPartitionsToTable
  iterates the partition list sequentially on the session metastore
  client, exactly as before.
- batching.enabled=true: partitions are split into batches of
  HIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's
  workers (one IMetaStoreClient per worker), and first-error semantics
  match the existing HMS-mode implementation (first failure thrown,
  subsequent failures logged at WARN).

HoodieHiveSyncClient now builds partitionClientPool for the HIVEQL mode
branches (explicit `mode=hiveql` and the legacy default-mode branch).
The pool is closed in HoodieHiveSyncClient.close() before
Hive.closeCurrent() so the RetryingMetaStoreClient instances release
their Thrift sockets without racing the ThreadLocal Hive cleanup.

IMetaStoreClientPool is brought in as part of this PR. An analogous
parallelization for HMS sync mode (separately tracked) would introduce
the same class; whichever lands first owns the file going forward.

Tests:
- New end-to-end test: testHiveQLDropPartitionsWithBatching — creates
  8 partitions, drops 4 through the parallel pool path with threads=3
  and batch_num=2 (so multiple drop batches race), asserts the
  remaining partition set matches.

Configs: no new configs. Reuses hoodie.datasource.hive_sync.batching.*
from apache#18984.

Related: apache#18331

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@nsivabalan nsivabalan force-pushed the hiveql-parallelize-drop branch from 7022e7d to 5e3c1fd Compare June 23, 2026 20:57

@hudi-agent hudi-agent left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ 🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for working on this! This PR extends the opt-in HiveQL batching to partition drops by fanning batches across an IMetaStoreClientPool, and routes ADD/UPDATE/TOUCH through a runSQLs hook backed by HiveDriverPool. The concurrency on the drop path itself looks sound (disjoint batches, partition-row-only ops, thread-safe DateTimeFormatter-based extractors). One error-path resource-leak edge case is worth double-checking in the inline comment. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and consistency nits below, otherwise the code is clean and well-documented.

break;
case HIVEQL:
ddlExecutor = new HiveQueryDDLExecutor(config, this.client);
this.partitionDriverPool = maybeBuildHiveDriverPool(config);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 These two pools are built eagerly here, but if maybeBuildPartitionClientPool(...) or the HiveQueryDDLExecutor constructor on the next line throws, the outer catch (line ~148) just rethrows — and since the constructor failed, close() is never called, so the already-built pool's worker threads + Thrift connections leak. Could we close partitionDriverPool/partitionClientPool in the catch before rethrowing? Same pattern in the legacy default-mode branch below.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

// this class; closed in close() before Hive.closeCurrent(). HiveQueryDDLExecutor
// uses it only for DROP (Hive Thrift, not Hive Driver) — see IMetaStoreClientPool
// javadoc.
private IMetaStoreClientPool partitionClientPool;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: partitionClientPool is a raw nullable while partitionDriverPool just below (line 101) is wrapped in Option<> — both represent the same "batching enabled" condition, so it might be worth making the container type consistent (either both Option<>, or both nullable with the same guard pattern) to avoid a future reader wondering if there's a semantic difference.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

public static final ConfigProperty<Integer> HIVE_SYNC_BATCHING_THREADS = ConfigProperty
.key("hoodie.datasource.hive_sync.batching.threads")
.defaultValue(4)
.markAdvanced()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: the doc says "Pool size (number of worker resources) and worker-thread count" — could you simplify to something like "Number of worker threads (and pooled metastore clients) for parallel partition sync when hoodie.datasource.hive_sync.batching.enabled is true"? The current wording lists "pool size" and "worker-thread count" as if they're distinct, but they're the same number.

⚠️ AI-generated; verify before applying. React 👍/👎 to flag quality.

// SessionState is shared across workers; build it once on the first call (with
// currentDatabase already set) and attach it to each worker's thread-local on
// subsequent calls. The database is a pool-wide property and never changes
// across workers, so setting it once at construction time is sufficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Sharing one SessionState across worker threads is risky here. Hive stores the active SessionState in a ThreadLocal (SessionState.start() sets tss.set(...)), so this pattern installs the same mutable instance into N threads' ThreadLocals. The Hive Driver isn't documented as thread-safe, and Driver execution mutates session-scoped state — currentDatabase (USE), conf overrides, and the compile/execute lineage/read-entity sets — so concurrent ALTER PARTITION/TOUCH runs can interleave on those fields. Worth confirming whether any of the batched statements can issue a USE or set conf; if so, per-worker SessionState (or serializing the session-mutating bits) is the safer default even at the init cost.

@hudi-bot

Copy link
Copy Markdown
Collaborator

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XL PR with lines of changes > 1000

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[IMPROVEMENT] Hive Sync partition operations lack batching and parallelism, causing 4x-9x slowdown for large tables

4 participants