Conversation
…rvice names, and schema support - Report failures to status when stage processor throws an exception in topology_runner - Add get_messaging_service_names() to PipelineServiceSource for messaging lineage support - Add messagingServiceNames to pipelineServiceMetadataPipeline JSON schema Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Set taskType="sync" on Fivetran pipeline tasks - Implement yield_pipeline_status() to derive execution history from succeeded_at/failed_at timestamps in connector details - Add failed_at to mock dataset for test coverage - Add tests for task type, status with both/one/no timestamps Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…phases, and lineage fixes
The Fivetran REST API sync-history endpoint has very limited retention
(entries age out within hours). This change queries the destination
warehouse's fivetran_metadata.log table directly for comprehensive sync
history with accurate per-phase timing, falling back to the REST API
when the destination DB is unavailable.
Key changes:
Destination DB sync history:
- Resolve the destination warehouse's DatabaseService from the service
registry using the existing dbServiceNames lineage configuration
- Query fivetran_metadata.log table with sqlglot-generated quoted
identifiers (dialect-aware for Snowflake uppercase, Postgres lowercase)
- Parse LOG events (sync_start, extract_summary, write_to_table_start/end,
sync_end, sync_stats) to derive per-phase timing and status
- Graceful fallback to REST API on any failure (unsupported destination
type, missing service, query error)
- Time-bounded queries (90-day retention) to avoid unbounded fetchall()
ELT task phases (Extract → Process → Load):
- Replace single "sync" task with three distinct pipeline tasks
representing Fivetran's ELT phases
- Each task has independent timing and status derived from LOG events
- sync_stats durations used as fallback when intermediate events are
missing (e.g., incremental syncs with no data changes)
- Task DAG wiring via downstreamTasks for UI rendering
Lineage fixes:
- Fix service name fallback: change `or "*"` to `or []` to prevent
building FQNs with literal "*" as service name
- Resolve pipeline entity once per connector instead of per-table (N+1)
- Fetch destination details once per group instead of per-connector (N+1)
- Support messaging sources (Kafka, Confluent Cloud) with topic lineage
- Column-level lineage via Fivetran schema API
- Self-lineage prevention (source == destination entity)
Client robustness:
- Add null/type guards to run_paginator for None API responses
- Fix get_connector_details/get_destination_details to return {} instead
of None on failure
- Fix base64 token encoding to use .decode("ascii") instead of str()[2:-1]
- Fix type annotation from Optional[Response] to Optional[dict]
- Remove unused Response import
Other improvements:
- Display name shows source first: "postgres <> Snowflake"
- Task type "Process" (not "Transform") for the processing phase
- sourceUrl only on Pipeline, not on individual Task objects
- Add copyright headers to models.py and service_spec.py
- Add return type annotations to model properties
- 34 unit tests covering DB query path, fallback scenarios, lineage
resolution, column lineage, and task status building
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ure, schedule interval edge cases - Add null guard after pipeline_entity resolution in lineage yield to prevent AttributeError crash when pipeline entity is not found - Cascade extract failure to process/load status instead of reporting false success when write events or sync_stats timestamps exist - Handle non-hour-divisible schedule intervals (e.g. 90 min) and clamp values >= 24 hours to daily cron - Add 5 tests covering all three fixes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ange Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add isinstance(response, dict) guards in get_connector_schema_details and get_connector_column_lineage, consistent with get_destination_details - Replace datetime.utcnow() with datetime.now(timezone.utc) for Python 3.12+ Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…me fallback
- Add parentheses around messagingServiceNames to fix operator precedence
when lineageInformation is None
- Use (dict.get("config") or {}) pattern to handle explicit null values
from the Fivetran API without AttributeError
- Use tz-aware datetime.min fallback in sync sorting to avoid TypeError
with tz-aware DB timestamps
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Filter _resolve_destination_service by dest_service_type to avoid resolving to the source database service instead of the destination - Mark process_status as Successful for zero-change incremental syncs where extract succeeds and sync_end is SUCCESSFUL but no write events Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…tests - Revert out-of-scope topology_runner.py change (Harsha) - Remove UNSUPPORTED_DESTINATION_TYPES — Fivetran Platform Connector is available on all destinations including Databricks - Move FIVETRAN_STATUS_MAP and HISTORICAL_SYNC_FIELDS to module level - Replace sqlglot + raw SQL with SQLAlchemy MetaData.reflect() and select(), use yield_per(100) for OOM protection - Extract _try_parse_json helper to reduce nesting in _parse_sync_events - Standardize StatusType enum usage (remove .value calls) - Fix operator precedence in get_db_service_names/get_storage_service_names - Add unit tests for schedule interval edge cases, malformed JSON, multi-sync parsing, fallback task statuses Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Remove TestRunStageProcessorErrorReporting test class since the corresponding topology_runner.py status.failed() change was reverted as out of scope for this PR. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR enhances the Fivetran pipeline connector across spec/UI schemas and the ingestion framework by adding messaging lineage configuration, improving SSL handling, and significantly expanding Fivetran pipeline modeling (tasks, status, lineage) and its unit tests.
Changes:
- Add
messagingServiceNamesto pipeline lineage configuration (spec + UI generated types) and expose it in ingestion base class helpers. - Improve Fivetran connection/client (SSL verification support, auth token handling, retries, sync-history endpoint support).
- Refactor/extend Fivetran ingestion logic: 3-phase ELT tasks, pipeline state/schedule derivation, pipeline status from warehouse logs with fallbacks, messaging (topic→table) lineage, and expanded pytest unit coverage.
Reviewed changes
Copilot reviewed 10 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts | Regenerated UI types to include messagingServiceNames under lineage info. |
| openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/pipelineServiceMetadataPipeline.ts | Regenerated UI types reflecting pipeline lineage schema updates. |
| openmetadata-ui/src/main/resources/ui/src/generated/entity/services/ingestionPipelines/ingestionPipeline.ts | Regenerated ingestion pipeline entity types with messagingServiceNames. |
| openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts | Regenerated API types to carry messagingServiceNames. |
| openmetadata-spec/src/main/resources/json/schema/metadataIngestion/pipelineServiceMetadataPipeline.json | Adds messagingServiceNames to pipeline lineage configuration schema. |
| openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/fivetranConnection.json | Adds verifySSL/sslConfig fields for Fivetran connection SSL configuration. |
| ingestion/tests/unit/topology/pipeline/test_fivetran.py | Migrates/refactors tests to pytest; adds coverage for new status/lineage utilities and behaviors. |
| ingestion/tests/unit/resources/datasets/fivetran_dataset.json | Updates fixture data (failed_at) for status/historical fallback testing. |
| ingestion/src/metadata/ingestion/source/pipeline/pipeline_service.py | Adds helper to read messagingServiceNames from lineageInformation. |
| ingestion/src/metadata/ingestion/source/pipeline/fivetran/service_spec.py | Adds license header; keeps service spec wiring. |
| ingestion/src/metadata/ingestion/source/pipeline/fivetran/models.py | Adds typing; adjusts Fivetran pipeline display-name formatting. |
| ingestion/src/metadata/ingestion/source/pipeline/fivetran/metadata.py | Major connector logic upgrades: ELT tasks, schedule/state, status resolution (DB/REST/historical), messaging lineage, column lineage refactor. |
| ingestion/src/metadata/ingestion/source/pipeline/fivetran/fivetran_log.py | New module to query/parse warehouse log table and derive per-phase task statuses. |
| ingestion/src/metadata/ingestion/source/pipeline/fivetran/client.py | SSL verification support + improved token handling; adds sync-history pagination; refactors pagination to iterator. |
| .gitignore | Ignores .claude directory/files. |
3036fae to
24378da
Compare
Co-authored-by: harshach <38649+harshach@users.noreply.github.com>
Co-authored-by: harshach <38649+harshach@users.noreply.github.com>
Resolved the merge conflicts in commit
All the copilot comments have been addressed across previous commits. Here's a summary of what was resolved:
|
Code Review ✅ Approved 3 resolved / 3 findingsFivetran integration logic now correctly handles DB log row skipping, API connection tests, and sync interval cron validation. These fixes resolve the reported functional inconsistencies and generator evaluation issues. ✅ 3 resolved✅ Bug: Empty DB log rows skip REST fallback, yielding zero statuses
✅ Bug: Test connection never exercises the API due to unevaluated generator
✅ Bug: Invalid cron for sync intervals >59 and not divisible by 60
OptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|




Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
if-elseblocks inparse_sync_eventswith a dispatch-based_EVENT_HANDLERSmap for cleaner maintenance.write_to_table_startandwrite_to_table_endto correctly maintain minimum and maximum timestamps across partitioned events.test_fivetran.pycovering event grouping, malformed JSON handling, and logic validation for timestamps.This will update automatically on new commits.