Fivetran improvements by harshach · Pull Request #27270 · open-metadata/OpenMetadata · GitHub
Skip to content

Fivetran improvements#27270

Open
harshach wants to merge 29 commits intomainfrom
fivetran_improvements
Open

Fivetran improvements#27270
harshach wants to merge 29 commits intomainfrom
fivetran_improvements

Conversation

@harshach
Copy link
Copy Markdown
Collaborator

@harshach harshach commented Apr 11, 2026

Describe your changes:

Fixes

I worked on ... because ...

Type of change:

  • Bug fix
  • Improvement
  • New feature
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: <short explanation>
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.

Summary by Gitar

  • Refactored event parsing:
    • Replaced monolithic if-else blocks in parse_sync_events with a dispatch-based _EVENT_HANDLERS map for cleaner maintenance.
  • Improved data handling:
    • Updated logic for write_to_table_start and write_to_table_end to correctly maintain minimum and maximum timestamps across partitioned events.
  • Enhanced unit testing:
    • Added comprehensive test cases to test_fivetran.py covering event grouping, malformed JSON handling, and logic validation for timestamps.

This will update automatically on new commits.

Aydin Geeringh and others added 16 commits March 23, 2026 11:37
…rvice names, and schema support

- Report failures to status when stage processor throws an exception in topology_runner
- Add get_messaging_service_names() to PipelineServiceSource for messaging lineage support
- Add messagingServiceNames to pipelineServiceMetadataPipeline JSON schema

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set taskType="sync" on Fivetran pipeline tasks
- Implement yield_pipeline_status() to derive execution history from
  succeeded_at/failed_at timestamps in connector details
- Add failed_at to mock dataset for test coverage
- Add tests for task type, status with both/one/no timestamps

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…phases, and lineage fixes

The Fivetran REST API sync-history endpoint has very limited retention
(entries age out within hours). This change queries the destination
warehouse's fivetran_metadata.log table directly for comprehensive sync
history with accurate per-phase timing, falling back to the REST API
when the destination DB is unavailable.

Key changes:

Destination DB sync history:
- Resolve the destination warehouse's DatabaseService from the service
  registry using the existing dbServiceNames lineage configuration
- Query fivetran_metadata.log table with sqlglot-generated quoted
  identifiers (dialect-aware for Snowflake uppercase, Postgres lowercase)
- Parse LOG events (sync_start, extract_summary, write_to_table_start/end,
  sync_end, sync_stats) to derive per-phase timing and status
- Graceful fallback to REST API on any failure (unsupported destination
  type, missing service, query error)
- Time-bounded queries (90-day retention) to avoid unbounded fetchall()

ELT task phases (Extract → Process → Load):
- Replace single "sync" task with three distinct pipeline tasks
  representing Fivetran's ELT phases
- Each task has independent timing and status derived from LOG events
- sync_stats durations used as fallback when intermediate events are
  missing (e.g., incremental syncs with no data changes)
- Task DAG wiring via downstreamTasks for UI rendering

Lineage fixes:
- Fix service name fallback: change `or "*"` to `or []` to prevent
  building FQNs with literal "*" as service name
- Resolve pipeline entity once per connector instead of per-table (N+1)
- Fetch destination details once per group instead of per-connector (N+1)
- Support messaging sources (Kafka, Confluent Cloud) with topic lineage
- Column-level lineage via Fivetran schema API
- Self-lineage prevention (source == destination entity)

Client robustness:
- Add null/type guards to run_paginator for None API responses
- Fix get_connector_details/get_destination_details to return {} instead
  of None on failure
- Fix base64 token encoding to use .decode("ascii") instead of str()[2:-1]
- Fix type annotation from Optional[Response] to Optional[dict]
- Remove unused Response import

Other improvements:
- Display name shows source first: "postgres <> Snowflake"
- Task type "Process" (not "Transform") for the processing phase
- sourceUrl only on Pipeline, not on individual Task objects
- Add copyright headers to models.py and service_spec.py
- Add return type annotations to model properties
- 34 unit tests covering DB query path, fallback scenarios, lineage
  resolution, column lineage, and task status building

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ure, schedule interval edge cases

- Add null guard after pipeline_entity resolution in lineage yield to
  prevent AttributeError crash when pipeline entity is not found
- Cascade extract failure to process/load status instead of reporting
  false success when write events or sync_stats timestamps exist
- Handle non-hour-divisible schedule intervals (e.g. 90 min) and clamp
  values >= 24 hours to daily cron
- Add 5 tests covering all three fixes

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ange

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add isinstance(response, dict) guards in get_connector_schema_details
  and get_connector_column_lineage, consistent with get_destination_details
- Replace datetime.utcnow() with datetime.now(timezone.utc) for Python 3.12+

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…me fallback

- Add parentheses around messagingServiceNames to fix operator precedence
  when lineageInformation is None
- Use (dict.get("config") or {}) pattern to handle explicit null values
  from the Fivetran API without AttributeError
- Use tz-aware datetime.min fallback in sync sorting to avoid TypeError
  with tz-aware DB timestamps

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Filter _resolve_destination_service by dest_service_type to avoid
  resolving to the source database service instead of the destination
- Mark process_status as Successful for zero-change incremental syncs
  where extract succeeds and sync_end is SUCCESSFUL but no write events

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tests

- Revert out-of-scope topology_runner.py change (Harsha)
- Remove UNSUPPORTED_DESTINATION_TYPES — Fivetran Platform Connector
  is available on all destinations including Databricks
- Move FIVETRAN_STATUS_MAP and HISTORICAL_SYNC_FIELDS to module level
- Replace sqlglot + raw SQL with SQLAlchemy MetaData.reflect() and
  select(), use yield_per(100) for OOM protection
- Extract _try_parse_json helper to reduce nesting in _parse_sync_events
- Standardize StatusType enum usage (remove .value calls)
- Fix operator precedence in get_db_service_names/get_storage_service_names
- Add unit tests for schedule interval edge cases, malformed JSON,
  multi-sync parsing, fallback task statuses

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove TestRunStageProcessorErrorReporting test class since the
corresponding topology_runner.py status.failed() change was reverted
as out of scope for this PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings April 11, 2026 06:02
@harshach harshach requested review from a team as code owners April 11, 2026 06:02
@github-actions github-actions Bot added backend safe to test Add this label to run secure Github workflows on PRs labels Apr 11, 2026
@github-actions
Copy link
Copy Markdown
Contributor

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR enhances the Fivetran pipeline connector across spec/UI schemas and the ingestion framework by adding messaging lineage configuration, improving SSL handling, and significantly expanding Fivetran pipeline modeling (tasks, status, lineage) and its unit tests.

Changes:

  • Add messagingServiceNames to pipeline lineage configuration (spec + UI generated types) and expose it in ingestion base class helpers.
  • Improve Fivetran connection/client (SSL verification support, auth token handling, retries, sync-history endpoint support).
  • Refactor/extend Fivetran ingestion logic: 3-phase ELT tasks, pipeline state/schedule derivation, pipeline status from warehouse logs with fallbacks, messaging (topic→table) lineage, and expanded pytest unit coverage.

Reviewed changes

Copilot reviewed 10 out of 16 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts Regenerated UI types to include messagingServiceNames under lineage info.
openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts Regenerated UI types reflecting pipeline lineage schema updates.
openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts Regenerated ingestion pipeline entity types with messagingServiceNames.
openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts Regenerated API types to carry messagingServiceNames.
openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json Adds messagingServiceNames to pipeline lineage configuration schema.
openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/fivetranConnection.json Adds verifySSL/sslConfig fields for Fivetran connection SSL configuration.
ingestion/tests/unit/topology/pipeline/test_fivetran.py Migrates/refactors tests to pytest; adds coverage for new status/lineage utilities and behaviors.
ingestion/tests/unit/resources/datasets/fivetran_dataset.json Updates fixture data (failed_at) for status/historical fallback testing.
ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py Adds helper to read messagingServiceNames from lineageInformation.
ingestion/src/metadata/ingestion/source/pipeline/fivetran/service_spec.py Adds license header; keeps service spec wiring.
ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py Adds typing; adjusts Fivetran pipeline display-name formatting.
ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py Major connector logic upgrades: ELT tasks, schedule/state, status resolution (DB/REST/historical), messaging lineage, column lineage refactor.
ingestion/src/metadata/ingestion/source/pipeline/fivetran/fivetran_log.py New module to query/parse warehouse log table and derive per-phase task statuses.
ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py SSL verification support + improved token handling; adds sync-history pagination; refactors pagination to iterator.
.gitignore Ignores .claude directory/files.

Comment thread ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/pipeline/fivetran/fivetran_log.py Outdated
@harshach harshach force-pushed the fivetran_improvements branch from 3036fae to 24378da Compare April 11, 2026 06:09
Copilot AI review requested due to automatic review settings April 11, 2026 06:12
Copilot AI and others added 2 commits April 23, 2026 19:11
Co-authored-by: harshach <38649+harshach@users.noreply.github.com>
Co-authored-by: harshach <38649+harshach@users.noreply.github.com>
Comment thread ingestion/src/metadata/ingestion/source/pipeline/fivetran/fivetran_log.py Outdated
Copilot AI review requested due to automatic review settings April 23, 2026 19:13
@harshach harshach review requested due to automatic review settings April 23, 2026 19:13
Copy link
Copy Markdown
Contributor

Copilot AI commented Apr 23, 2026

@copilot resolve the merge conflicts in this pull request

Resolved the merge conflicts in commit e4efdc0. The only conflict was skills/connector-audit/prompts/02-error-handling.md which was deleted in origin/main (Task redesign #25894) but modified in our branch. Removed the file and merged with origin/main cleanly.

There seem to be some relevant gitar/copilot comments to address

All the copilot comments have been addressed across previous commits. Here's a summary of what was resolved:

  • _get_schedule_interval now returns None for minutes <= 0 (and also for non-60-divisible frequencies ≥ 60)
  • yield_pipeline_status uses if db_statuses: so empty DB results fall back to REST/historical
  • sort_and_limit_syncs uses timezone-aware-safe comparisons via _get_sortable_sync_start
  • _yield_sync_history_statuses deduplicates before yielding with the seen_timestamps check
  • _get_data raises RuntimeError when response is None and logs a warning for non-dict shapes
  • _get_database_name checks multiple config keys (database, catalog, project_id, project)
  • test_connection uses list(client.list_groups()) to force the generator to execute

Copilot AI review requested due to automatic review settings April 23, 2026 20:19
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 16 out of 22 changed files in this pull request and generated no new comments.

@gitar-bot
Copy link
Copy Markdown

gitar-bot Bot commented Apr 23, 2026

Code Review ✅ Approved 3 resolved / 3 findings

Fivetran integration logic now correctly handles DB log row skipping, API connection tests, and sync interval cron validation. These fixes resolve the reported functional inconsistencies and generator evaluation issues.

✅ 3 resolved
Bug: Empty DB log rows skip REST fallback, yielding zero statuses

📄 ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py:183-189 📄 ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py:236-250
When the warehouse DB query succeeds but returns zero rows (e.g., new connector with no sync history yet), query_sync_logs returns []. Then parse_sync_events([]){}, sort_and_limit_syncs({})[], and _get_status_from_db returns an empty list [].

Back in yield_pipeline_status, the check if db_statuses is not None is True for [], so the method iterates over nothing and returns — the REST API fallback (_get_status_from_rest) is never reached. This means connectors whose log table exists but has no matching rows will silently produce zero pipeline statuses, even though the REST sync-history or historical fields might have data.

Bug: Test connection never exercises the API due to unevaluated generator

📄 ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py:37-51
list_groups() now returns a generator (Iterable[dict]) via _run_paginator, which uses yield. In connection.py:51, it is registered as the test-connection function: {"GetPipelines": client.list_groups}. The test_connection_steps framework calls step.function() but never iterates the result — it only checks that no exception is raised. Since a generator function returns immediately without executing its body, the actual HTTP call to Fivetran's /groups endpoint is never made, and the test connection will always pass regardless of whether the API credentials are valid.

Bug: Invalid cron for sync intervals >59 and not divisible by 60

📄 ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py:658-659
When minutes is between 60 and not divisible by 60 (e.g. 90, 150), the new code at lines 658-659 produces */90 * * * *. The cron minute field only accepts values 0-59, so */90 is effectively equivalent to */0 or fires only at minute 0 — it does not run every 90 minutes as intended.

A correct approach for intervals that don't map cleanly to cron would be to return None (unknown schedule) or use a comment/description string, since standard cron cannot express arbitrary minute intervals above 59.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@sonarqubecloud
Copy link
Copy Markdown

@sonarqubecloud
Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend safe to test Add this label to run secure Github workflows on PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants