[SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55507
Conversation
…HANGELOG_SCHEMA error class Validate the CDC metadata columns, row identity and row versioning returned by a `Changelog` connector at relation construction time, and introduce a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error. - `ChangelogTable.validateSchema`: fail-fast checks that the connector schema contains the required metadata columns (`_change_type`, `_commit_version`, `_commit_timestamp`), and that, when the connector advertises a capability requiring it, `rowId()` and `rowVersion()` are declared and the row version column is a non-nullable top-level column. Invoked from the `ChangelogTable` constructor. - New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`, `MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`, `NULLABLE_ROW_VERSION`. - `QueryCompilationErrors` helpers for each sub-class. - Tests: `ChangelogResolutionSuite` schema-validation cases using a `TestChangelog` fixture that returns hand-crafted schemas.
5f5279e to
753bea1
Compare
johanl-db
left a comment
There was a problem hiding this comment.
i don't have concerns about this change, some minor improvements suggested
There was a problem hiding this comment.
The last part is directed to connector developers and not actionable for actual users, remove it
| }, | ||
| "MISSING_ROW_VERSION" : { | ||
| "message" : [ | ||
| "Connector advertises `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is not implemented. Override `rowVersion()` to return a `NamedReference` pointing to a non-nullable column in `Changelog.columns()`." |
There was a problem hiding this comment.
Sane here, remove the last part.
Also grammar: 'Connector advertises ... is 'true
| // `rowId()` / `rowVersion()` default to throwing UnsupportedOperationException for | ||
| // connectors that haven't opted in. Translate that into "not declared" so we can | ||
| // reason about it as Option/empty-array below. | ||
| val rowIds: Array[NamedReference] = try cl.rowId() catch { | ||
| case _: UnsupportedOperationException => Array.empty | ||
| } | ||
| val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) catch { | ||
| case _: UnsupportedOperationException => None | ||
| } |
There was a problem hiding this comment.
I'm on the fence here, I think I'd rather not catch UnsupportedOperationException and surface that error directly.
This is an error from connector developers that didn't implement the right methods even tough the capabilities they reported require it. So it doesn't really need to have a user-facing error message, surfacing UnsupportedOperationException would actually make it a bit more obvious that there's an issue with the connector itself.
The important point is that we only call these methods here if the capabilities require it
gengliangwang
left a comment
There was a problem hiding this comment.
Summary
Clean, contained change: a connector-side Changelog that returns a misshapen CDC schema now gets a sharp INVALID_CHANGELOG_SCHEMA.* at analysis time, replacing the earlier opaque execution-time failure. The validator runs eagerly in ChangelogTable's constructor, which is the right boundary — everything downstream (resolution, planning, scans) then sees a schema it can trust.
A handful of things worth addressing before merging, in priority order:
rowIdnon-nullability is not validated, even though theChangelog.rowId()Javadoc says "Each referenced column must be non-nullable" and the existing peerSupportsDelta.rowId()path (resolveRowIdAttrs→NULLABLE_ROW_ID_ATTRIBUTES) has been doing this check for years. This PR is asymmetric:rowVersiongets nullability + top-level-ness,rowIdgets presence only.- The new
NESTED_ROW_VERSIONconstraint ("rowVersion must be a top-level column") is not documented onChangelog.rowVersion(). Right now a connector author can follow the Javadoc exactly ("non-nullable") and still trip this error. Either add the requirement to the Javadoc or drop the check. - PR description overstates test coverage. The "How was this patch tested?" section lists tests for "row-identity-required capabilities without rowId/rowVersion" and "nested rowVersion", but the suite only exercises metadata presence/types, nullable
rowVersion, and valid schemas — there is noMISSING_ROW_ID,MISSING_ROW_VERSION, orNESTED_ROW_VERSIONcase, and capability triggers other thancontainsCarryoverRows=trueare unexercised.
Remaining inline comments are smaller (scoping, error-message specificity, a comment typo).
For the error-text wording around MISSING_ROW_ID / MISSING_ROW_VERSION I'll defer to @johanl-db's existing comments rather than duplicate.
| changelog: Changelog, | ||
| changelogInfo: ChangelogInfo) extends Table with SupportsRead { | ||
|
|
||
| // Validate the connector returned a schema with the required CDC metadata columns |
There was a problem hiding this comment.
Minor grammar — "Validate that the connector …".
| // Validate the connector returned a schema with the required CDC metadata columns | |
| // Validate that the connector returned a schema with the required CDC metadata columns |
|
|
||
| object ChangelogTable { | ||
|
|
||
| def validateSchema(cl: Changelog): Unit = { |
There was a problem hiding this comment.
Consider scoping to private[v2]: ChangelogTable is documented as NOT a connector API surface, and validateSchema is only called from the primary constructor. Keeping it public invites external callers to bypass or double-invoke the validation.
| cl.containsIntermediateChanges() | ||
| if (needsRowId && (rowIds == null || rowIds.isEmpty)) { | ||
| throw QueryCompilationErrors.changelogMissingRowIdError(cl.name) | ||
| } |
There was a problem hiding this comment.
rowId columns are not checked for non-nullability, even though (a) the Changelog.rowId() Javadoc requires "Each referenced column must be non-nullable", and (b) the peer row-level-operations path validates this via RewriteRowLevelCommand.resolveRowIdAttrs with NULLABLE_ROW_ID_ATTRIBUTES. Consider adding a parallel NULLABLE_ROW_ID sub-class (or at least stating explicitly that rowId column validation is deferred to a later PR). As written, rowVersion gets nullability + top-level-ness but rowId gets presence only.
| // delete+insert pair would be misclassified as a real update). | ||
| rowVersionRef.foreach { ref => | ||
| val fieldNames = ref.fieldNames() | ||
| if (fieldNames.length != 1) { |
There was a problem hiding this comment.
The top-level requirement is new — the Changelog.rowVersion() Javadoc only says "non-nullable". A connector that reads the contract and returns a nested NamedReference will fail with NESTED_ROW_VERSION but get no hint from the API docs. Please either (a) update the Changelog.rowVersion() Javadoc to state that the reference must be a top-level column of columns(), or (b) remove this check and accept nested references. Same applies (by extension) to rowId if top-level-ness is also intended there.
| } | ||
| val columnName = fieldNames(0) | ||
| val col = byName.getOrElse(columnName, | ||
| throw QueryCompilationErrors.changelogMissingColumnError(cl.name, columnName)) |
There was a problem hiding this comment.
When rowVersion() points to a column that isn't in columns(), we throw MISSING_COLUMN with the referenced column name — the user sees "Required column <x> is missing", as if a required metadata column were missing. The real cause is "rowVersion references a column not in the schema." Consider a dedicated sub-class (e.g. ROW_VERSION_COLUMN_NOT_FOUND) or at least a reworded message. This path also has no test.
There was a problem hiding this comment.
Three sub-classes introduced in this PR have no test case: MISSING_ROW_ID, MISSING_ROW_VERSION, NESTED_ROW_VERSION. Capability triggers other than containsCarryoverRows=true are also untested (no case exercising representsUpdateAsDeleteAndInsert=true or containsIntermediateChanges=true, and no case with a rowVersion referencing a column that is absent from columns()). The TestChangelog fixture already supports these — a few small cases would close the coverage gap the PR description claims is already covered.

What changes were proposed in this pull request?
This is PR 1 of a split of #55426 (see the split suggestion for the full plan). Can merge in any order, but 1 (#55507) < 2 (#55508) would be preferable. For more context, see discussion posted to dev@spark.apache.org and linked SPIP.
Validates the CDC metadata columns, row identity and row versioning returned by a
Changelogconnector at relation construction time, and introduces a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error.ChangelogTable.validateSchema: fail-fast checks that the connector schema contains the required metadata columns (_change_type,_commit_version,_commit_timestamp), and that — when the connector advertises a capability requiring it —rowId()androwVersion()are declared and the row version column is a non-nullable top-level column. Invoked from theChangelogTableconstructor.INVALID_CHANGELOG_SCHEMAwith sub-classes:MISSING_COLUMN,INVALID_COLUMN_TYPEMISSING_ROW_ID,MISSING_ROW_VERSION,NESTED_ROW_VERSION,NULLABLE_ROW_VERSIONQueryCompilationErrorshelpers for each sub-class.ChangelogResolutionSuiteschema-validation cases using aTestChangelogfixture.Why are the changes needed?
Gives connector implementors a clear analysis-time error message for misshapen CDC schemas instead of an opaque execution-time failure. Background on the original PR and its discussion thread.
Does this PR introduce any user-facing change?
Yes, for connector implementors. A connector that returns an invalid changelog schema (or advertises a capability that requires row identity/row versioning without declaring them) now fails at analysis time with
INVALID_CHANGELOG_SCHEMA.*instead of at execution time.How was this patch tested?
Added schema-validation cases to
ChangelogResolutionSuitecovering: missing metadata column (each of_change_type,_commit_version,_commit_timestamp), wrong data type, connector-defined_commit_versiontype accepted, row-identity-required capabilities without rowId/rowVersion, nested rowVersion, nullable rowVersion, and valid schemas with data columns pass.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7