feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033
feat(hive-sync): parallelize DROP partitions in HiveQL sync mode#19033nsivabalan wants to merge 2 commits into
Conversation
hudi-agent
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for the contribution! This PR extends the parallel batching infrastructure to HiveQL DROP partitions by wiring the existing IMetaStoreClientPool into HiveQueryDDLExecutor. The structure is cleanly gated behind the existing opt-in flag, and the test coverage for both the new pool plumbing and the end-to-end DROP path is solid. A couple of correctness concerns around resource cleanup and SessionState sharing are worth a closer look in the inline comments. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of nits on a field that exists purely for documentation and a duplicated futures-error loop.
There was a problem hiding this comment.
🤖 If ddlExecutor.close() throws, control jumps to the outer catch (Exception e) { log.error(...) } (which doesn't rethrow), so partitionClientPool.close() never runs and the Thrift sockets + worker threads leak silently. Could you wrap the pool cleanup in a try-finally so it runs regardless of whether ddlExecutor.close() succeeds? E.g. try { ddlExecutor.close(); } finally { if (partitionClientPool != null) { ... } }.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // SessionState is shared across workers; build it once on the first call (with | ||
| // currentDatabase already set) and attach it to each worker's thread-local on | ||
| // subsequent calls. The database is a pool-wide property and never changes | ||
| // across workers, so setting it once at construction time is sufficient. |
There was a problem hiding this comment.
🤖 The shared SessionState pattern here makes me a bit nervous — every worker thread calls SessionState.start(sharedSessionState) and then runs concurrent Driver.run() against the same instance. Hive's SessionState has plenty of mutable fields the Driver writes during query execution (e.g. currentDatabase via USE, hiveOperation, cmd, lineageState, tablesAlreadyMarkedAsRead). Are we confident none of these get written concurrently by parallel ALTER PARTITION / TOUCH statements? @yihua could you weigh in on whether sharing SessionState across worker threads is safe in practice, or whether each worker should construct its own (paying the SessionState init cost in exchange for isolation)?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
There was a problem hiding this comment.
🤖 Sharing one SessionState across worker threads is risky here. Hive stores the active SessionState in a ThreadLocal (SessionState.start() sets tss.set(...)), so this pattern installs the same mutable instance into N threads' ThreadLocals. The Hive Driver isn't documented as thread-safe, and Driver execution mutates session-scoped state — currentDatabase (USE), conf overrides, and the compile/execute lineage/read-entity sets — so concurrent ALTER PARTITION/TOUCH runs can interleave on those fields. Worth confirming whether any of the batched statements can issue a USE or set conf; if so, per-worker SessionState (or serializing the session-mutating bits) is the safer default even at the init cost.
| for (List<String> batch : batches) { | ||
| futures.add(partitionClientPool.executor().submit(() -> | ||
| partitionClientPool.run(poolClient -> { | ||
| action.apply(poolClient, batch); |
There was a problem hiding this comment.
🤖 When a Future fails, f.get() throws ExecutionException wrapping the real cause. firstError then stores the ExecutionException, which gets rethrown and caught by the outer catch (Exception e) in addPartitionsToTable/dropPartitionsToTable/registerAlterPartitionEvent, which wraps it again in HoodieHiveSyncException. The result is HoodieHiveSyncException -> ExecutionException -> RealCause — one more wrapping layer than the sequential path produced. HiveDriverPool.awaitAll already has an unwrap() helper; could we do the same here for consistency?
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| // Present only when HIVE_SYNC_BATCHING_ENABLED and sync mode is HIVEQL (explicit | ||
| // or legacy default). Owned by HiveQueryDDLExecutor; this field is kept for | ||
| // reference only — close() is delegated through ddlExecutor.close(). | ||
| private Option<HiveDriverPool> partitionDriverPool = Option.empty(); |
There was a problem hiding this comment.
🤖 nit: the comment says this field is 'kept for reference only — close() is delegated through ddlExecutor.close()', which means it's never read after being passed into HiveQueryDDLExecutor. Could you make it a local variable in the constructor instead? A field that isn't read after construction will make future readers wonder what lifecycle role it plays.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
| * semantics, so each worker still iterates its chunk one partition at a time — the | ||
| * win is fanning chunks across independent Thrift clients. | ||
| */ | ||
| private void runDropBatches(String tableName, List<List<String>> batches) throws Exception { |
There was a problem hiding this comment.
🤖 nit: the parallel-path error-collection loop inside runDropBatches (Exception firstError = null; for (Future<Void> f : futures) …) is nearly identical to the one in HMSDDLExecutor.runBatches. Since HiveDriverPool already has a well-factored awaitAll(List<Future<?>>) for exactly this pattern, it might be worth adding a similar awaitAll(List<Future<Void>>) to IMetaStoreClientPool so both callers can delegate to it instead.
- AI-generated; verify before applying. React 👍/👎 to flag quality.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #19033 +/- ##
============================================
- Coverage 68.26% 67.65% -0.62%
- Complexity 29513 29865 +352
============================================
Files 2542 2564 +22
Lines 142627 145490 +2863
Branches 17788 18371 +583
============================================
+ Hits 97369 98428 +1059
- Misses 37253 38828 +1575
- Partials 8005 8234 +229
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Adds an opt-in pool that splits HiveQL partition DDL (add/update/touch/drop) into batches of `hoodie.datasource.hive_sync.batch_num` and dispatches them in parallel across a pool of single-thread workers. Each worker owns its own Hive `Driver` + `SessionState` (both thread-bound in Hive 2.x), so the fan-out is implemented as a fixed pool of dedicated single-thread executors rather than a shared thread pool. Table-level operations (create/alter table, last commit time, writer version) continue to use the single session `Driver`. Partition-phase SQL lists run through the pool only when `hoodie.datasource.hive_sync.batching.enabled` is set to true; default off, existing behavior unchanged. Hive 2.x's `ALTER PARTITION SET LOCATION` ignores db.table qualifiers and uses the connection's current database, so each worker is primed with the correct USE statement before any partition ALTER is dispatched. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Follow-up to the HiveQL partition batching introduced in the prior commit (PR apache#18984). HiveQueryDDLExecutor.dropPartitionsToTable goes through IMetaStoreClient.dropPartition (Thrift), not the Hive Driver — so it can't reuse HiveDriverPool. This change wires an IMetaStoreClientPool into HiveQueryDDLExecutor and uses it to fan drop batches across the pool's worker threads. Behavior: - batching.enabled=false (default): unchanged. dropPartitionsToTable iterates the partition list sequentially on the session metastore client, exactly as before. - batching.enabled=true: partitions are split into batches of HIVE_BATCH_SYNC_PARTITION_NUM, batches fan out across the pool's workers (one IMetaStoreClient per worker), and first-error semantics match the existing HMS-mode implementation (first failure thrown, subsequent failures logged at WARN). HoodieHiveSyncClient now builds partitionClientPool for the HIVEQL mode branches (explicit `mode=hiveql` and the legacy default-mode branch). The pool is closed in HoodieHiveSyncClient.close() before Hive.closeCurrent() so the RetryingMetaStoreClient instances release their Thrift sockets without racing the ThreadLocal Hive cleanup. IMetaStoreClientPool is brought in as part of this PR. An analogous parallelization for HMS sync mode (separately tracked) would introduce the same class; whichever lands first owns the file going forward. Tests: - New end-to-end test: testHiveQLDropPartitionsWithBatching — creates 8 partitions, drops 4 through the parallel pool path with threads=3 and batch_num=2 (so multiple drop batches race), asserts the remaining partition set matches. Configs: no new configs. Reuses hoodie.datasource.hive_sync.batching.* from apache#18984. Related: apache#18331 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7022e7d to
5e3c1fd
Compare
hudi-agent
left a comment
There was a problem hiding this comment.
Thanks for working on this! This PR extends the opt-in HiveQL batching to partition drops by fanning batches across an IMetaStoreClientPool, and routes ADD/UPDATE/TOUCH through a runSQLs hook backed by HiveDriverPool. The concurrency on the drop path itself looks sound (disjoint batches, partition-row-only ops, thread-safe DateTimeFormatter-based extractors). One error-path resource-leak edge case is worth double-checking in the inline comment. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here. A couple of small naming and consistency nits below, otherwise the code is clean and well-documented.
| break; | ||
| case HIVEQL: | ||
| ddlExecutor = new HiveQueryDDLExecutor(config, this.client); | ||
| this.partitionDriverPool = maybeBuildHiveDriverPool(config); |
There was a problem hiding this comment.
🤖 These two pools are built eagerly here, but if maybeBuildPartitionClientPool(...) or the HiveQueryDDLExecutor constructor on the next line throws, the outer catch (line ~148) just rethrows — and since the constructor failed, close() is never called, so the already-built pool's worker threads + Thrift connections leak. Could we close partitionDriverPool/partitionClientPool in the catch before rethrowing? Same pattern in the legacy default-mode branch below.
| // this class; closed in close() before Hive.closeCurrent(). HiveQueryDDLExecutor | ||
| // uses it only for DROP (Hive Thrift, not Hive Driver) — see IMetaStoreClientPool | ||
| // javadoc. | ||
| private IMetaStoreClientPool partitionClientPool; |
There was a problem hiding this comment.
🤖 nit: partitionClientPool is a raw nullable while partitionDriverPool just below (line 101) is wrapped in Option<> — both represent the same "batching enabled" condition, so it might be worth making the container type consistent (either both Option<>, or both nullable with the same guard pattern) to avoid a future reader wondering if there's a semantic difference.
| public static final ConfigProperty<Integer> HIVE_SYNC_BATCHING_THREADS = ConfigProperty | ||
| .key("hoodie.datasource.hive_sync.batching.threads") | ||
| .defaultValue(4) | ||
| .markAdvanced() |
There was a problem hiding this comment.
🤖 nit: the doc says "Pool size (number of worker resources) and worker-thread count" — could you simplify to something like "Number of worker threads (and pooled metastore clients) for parallel partition sync when hoodie.datasource.hive_sync.batching.enabled is true"? The current wording lists "pool size" and "worker-thread count" as if they're distinct, but they're the same number.
| // SessionState is shared across workers; build it once on the first call (with | ||
| // currentDatabase already set) and attach it to each worker's thread-local on | ||
| // subsequent calls. The database is a pool-wide property and never changes | ||
| // across workers, so setting it once at construction time is sufficient. |
There was a problem hiding this comment.
🤖 Sharing one SessionState across worker threads is risky here. Hive stores the active SessionState in a ThreadLocal (SessionState.start() sets tss.set(...)), so this pattern installs the same mutable instance into N threads' ThreadLocals. The Hive Driver isn't documented as thread-safe, and Driver execution mutates session-scoped state — currentDatabase (USE), conf overrides, and the compile/execute lineage/read-entity sets — so concurrent ALTER PARTITION/TOUCH runs can interleave on those fields. Worth confirming whether any of the batched statements can issue a USE or set conf; if so, per-worker SessionState (or serializing the session-mutating bits) is the safer default even at the init cost.

Describe the issue this Pull Request addresses
Closes #18331 (continued).
The HiveQL sync mode's
dropPartitionsToTablewalks the partition list one element at a time on the sessionIMetaStoreClient(Thrift). On tables with thousands of partitions, a single sync that needs to drop a non-trivial subset becomes a multi-minute serial Thrift loop.The companion PR #18984 introduced parallel ADD/UPDATE/TOUCH for HiveQL via
HiveDriverPool. DROP can't reuse that pool because it goes throughIMetaStoreClient.dropPartition, not the HiveDriver. This PR adds the analogous Thrift-side fan-out so the opt-in batching flag covers all four partition operations end-to-end.This PR is built on top of #18984. Once #18984 merges to master, this PR rebases trivially.
Summary and Changelog
When
hoodie.datasource.hive_sync.batching.enabled=trueand sync mode is HIVEQL, partition drops are split into batches ofhoodie.datasource.hive_sync.batch_numand dispatched across a pool of worker threads, each holding its ownIMetaStoreClient. Default off; existing behavior is unchanged.Changes:
HiveQueryDDLExecutor.dropPartitionsToTable— previously iterated the partition list sequentially on the session metastore client. Now splits into batches ofHIVE_BATCH_SYNC_PARTITION_NUMand either runs them sequentially on the session client (default) or fans them across the pool (when batching is enabled). First-error semantics: first failure thrown, subsequent failures logged at WARN. Each worker iterates its chunk one partition at a time — Hive has no batch-drop primitive matchingdropPartition's semantics, so the win comes from running chunks across independent Thrift clients in parallel.HoodieHiveSyncClient— builds aIMetaStoreClientPoolfor the HIVEQL mode branches (mode=hiveqland the legacy default-mode branch) when batching is enabled. The pool is closed inclose()beforeHive.closeCurrent()so theRetryingMetaStoreClientinstances release their Thrift sockets without racing theThreadLocalHive cleanup. Falls back to single-client sequential drop whenhoodie.datasource.hive_sync.use_spark_catalog=true(the Spark catalog client is constructed reflectively and isn't compatible with the directRetryingMetaStoreClientpool path).util/IMetaStoreClientPool— new, ~207 lines. Bounded blocking pool ofRetryingMetaStoreClientinstances backed by a fixed-sizeExecutorService. Workers borrow a client for the duration of the function (pool.run(Function)), and the pool releases it back on the slot's free list on return. UnlikeHiveDriverPool(which is constrained by Hive 2.x's thread-boundDriver/SessionState),IMetaStoreClientis a Thrift socket — the pool is a plainArrayBlockingQueue+ExecutorService, no per-slot dedicated thread required.Design constraint note: DROP and ADD/UPDATE/TOUCH live on opposite concurrency models in HiveQL mode. ADD/UPDATE/TOUCH go through the Hive
Driver(thread-bound,SessionStateisThreadLocal) and useHiveDriverPool. DROP goes through the Thrift client and usesIMetaStoreClientPool. The two pools are constructed only whenbatching.enabled=trueand torn down together inHoodieHiveSyncClient.close().Impact
User-facing impact is opt-in only.
hoodie.datasource.hive_sync.batching.enabled=false— DROP runs exactly as it does today (single client, sequential).hoodie.datasource.hive_sync.batching.enabled,hoodie.datasource.hive_sync.batching.threads, andhoodie.datasource.hive_sync.batch_numintroduced by feat(hive-sync): batch and parallelize HiveQL partition operations #18984.Risk Level
Medium.
Mitigation:
testHiveQLDropPartitionsWithBatchingcreates 8 partitions and drops 4 through the parallel pool withthreads=3andbatch_num=2(so multiple drop batches race against each other on independent Thrift clients) and asserts the remaining partition set.batching.enabled=false, the newrunDropBatcheshelper falls through to a sequentialapplyDropBatchagainst the session client — semantically equivalent to the prior loop.Documentation Update
No new configs; no docsite change required.
Contributor's checklist