[SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class by SanJSp · Pull Request #55507 · apache/spark · GitHub
Skip to content

[SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55507

Open
SanJSp wants to merge 1 commit intoapache:masterfrom
SanJSp:SPARK-55668-PR1-changelog-schema-validation
Open

[SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class#55507
SanJSp wants to merge 1 commit intoapache:masterfrom
SanJSp:SPARK-55668-PR1-changelog-schema-validation

Conversation

@SanJSp
Copy link
Copy Markdown

@SanJSp SanJSp commented Apr 23, 2026

What changes were proposed in this pull request?

This is PR 1 of a split of #55426 (see the split suggestion for the full plan). Can merge in any order, but 1 (#55507) < 2 (#55508) would be preferable. For more context, see discussion posted to dev@spark.apache.org and linked SPIP.

Validates the CDC metadata columns, row identity and row versioning returned by a Changelog connector at relation construction time, and introduces a dedicated error class to report the failure at analysis time rather than later at execution time with a less helpful error.

  • ChangelogTable.validateSchema: fail-fast checks that the connector schema contains the required metadata columns (_change_type, _commit_version, _commit_timestamp), and that — when the connector advertises a capability requiring it — rowId() and rowVersion() are declared and the row version column is a non-nullable top-level column. Invoked from the ChangelogTable constructor.
  • New error class INVALID_CHANGELOG_SCHEMA with sub-classes:
    • MISSING_COLUMN, INVALID_COLUMN_TYPE
    • MISSING_ROW_ID, MISSING_ROW_VERSION, NESTED_ROW_VERSION, NULLABLE_ROW_VERSION
  • Matching QueryCompilationErrors helpers for each sub-class.
  • Tests: ChangelogResolutionSuite schema-validation cases using a TestChangelog fixture.

Why are the changes needed?

Gives connector implementors a clear analysis-time error message for misshapen CDC schemas instead of an opaque execution-time failure. Background on the original PR and its discussion thread.

Does this PR introduce any user-facing change?

Yes, for connector implementors. A connector that returns an invalid changelog schema (or advertises a capability that requires row identity/row versioning without declaring them) now fails at analysis time with INVALID_CHANGELOG_SCHEMA.* instead of at execution time.

How was this patch tested?

Added schema-validation cases to ChangelogResolutionSuite covering: missing metadata column (each of _change_type, _commit_version, _commit_timestamp), wrong data type, connector-defined _commit_version type accepted, row-identity-required capabilities without rowId/rowVersion, nested rowVersion, nullable rowVersion, and valid schemas with data columns pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

@SanJSp SanJSp changed the title [SPARK-55668][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class [SPARK-55951][SQL] Add ChangelogTable schema validation and INVALID_CHANGELOG_SCHEMA error class Apr 23, 2026
…HANGELOG_SCHEMA error class

Validate the CDC metadata columns, row identity and row versioning returned
by a `Changelog` connector at relation construction time, and introduce a
dedicated error class to report the failure at analysis time rather than
later at execution time with a less helpful error.

- `ChangelogTable.validateSchema`: fail-fast checks that the connector
  schema contains the required metadata columns (`_change_type`,
  `_commit_version`, `_commit_timestamp`), and that, when the connector
  advertises a capability requiring it, `rowId()` and `rowVersion()` are
  declared and the row version column is a non-nullable top-level column.
  Invoked from the `ChangelogTable` constructor.
- New error class `INVALID_CHANGELOG_SCHEMA` with sub-classes
  `MISSING_COLUMN`, `INVALID_COLUMN_TYPE`, `MISSING_ROW_ID`,
  `MISSING_ROW_VERSION`, `NESTED_ROW_VERSION`, `NULLABLE_ROW_VERSION`.
- `QueryCompilationErrors` helpers for each sub-class.
- Tests: `ChangelogResolutionSuite` schema-validation cases using a
  `TestChangelog` fixture that returns hand-crafted schemas.
@SanJSp SanJSp force-pushed the SPARK-55668-PR1-changelog-schema-validation branch from 5f5279e to 753bea1 Compare April 23, 2026 12:05
Copy link
Copy Markdown
Contributor

@johanl-db johanl-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't have concerns about this change, some minor improvements suggested

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The last part is directed to connector developers and not actionable for actual users, remove it

},
"MISSING_ROW_VERSION" : {
"message" : [
"Connector advertises `containsCarryoverRows` or `representsUpdateAsDeleteAndInsert` is `true`, but `Changelog.rowVersion()` is not implemented. Override `rowVersion()` to return a `NamedReference` pointing to a non-nullable column in `Changelog.columns()`."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sane here, remove the last part.
Also grammar: 'Connector advertises ... is true'

Comment on lines +73 to +81
// `rowId()` / `rowVersion()` default to throwing UnsupportedOperationException for
// connectors that haven't opted in. Translate that into "not declared" so we can
// reason about it as Option/empty-array below.
val rowIds: Array[NamedReference] = try cl.rowId() catch {
case _: UnsupportedOperationException => Array.empty
}
val rowVersionRef: Option[NamedReference] = try Some(cl.rowVersion()) catch {
case _: UnsupportedOperationException => None
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence here, I think I'd rather not catch UnsupportedOperationException and surface that error directly.
This is an error from connector developers that didn't implement the right methods even tough the capabilities they reported require it. So it doesn't really need to have a user-facing error message, surfacing UnsupportedOperationException would actually make it a bit more obvious that there's an issue with the connector itself.

The important point is that we only call these methods here if the capabilities require it

Copy link
Copy Markdown
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Clean, contained change: a connector-side Changelog that returns a misshapen CDC schema now gets a sharp INVALID_CHANGELOG_SCHEMA.* at analysis time, replacing the earlier opaque execution-time failure. The validator runs eagerly in ChangelogTable's constructor, which is the right boundary — everything downstream (resolution, planning, scans) then sees a schema it can trust.

A handful of things worth addressing before merging, in priority order:

  1. rowId non-nullability is not validated, even though the Changelog.rowId() Javadoc says "Each referenced column must be non-nullable" and the existing peer SupportsDelta.rowId() path (resolveRowIdAttrsNULLABLE_ROW_ID_ATTRIBUTES) has been doing this check for years. This PR is asymmetric: rowVersion gets nullability + top-level-ness, rowId gets presence only.
  2. The new NESTED_ROW_VERSION constraint ("rowVersion must be a top-level column") is not documented on Changelog.rowVersion(). Right now a connector author can follow the Javadoc exactly ("non-nullable") and still trip this error. Either add the requirement to the Javadoc or drop the check.
  3. PR description overstates test coverage. The "How was this patch tested?" section lists tests for "row-identity-required capabilities without rowId/rowVersion" and "nested rowVersion", but the suite only exercises metadata presence/types, nullable rowVersion, and valid schemas — there is no MISSING_ROW_ID, MISSING_ROW_VERSION, or NESTED_ROW_VERSION case, and capability triggers other than containsCarryoverRows=true are unexercised.

Remaining inline comments are smaller (scoping, error-message specificity, a comment typo).

For the error-text wording around MISSING_ROW_ID / MISSING_ROW_VERSION I'll defer to @johanl-db's existing comments rather than duplicate.

changelog: Changelog,
changelogInfo: ChangelogInfo) extends Table with SupportsRead {

// Validate the connector returned a schema with the required CDC metadata columns
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor grammar — "Validate that the connector …".

Suggested change
// Validate the connector returned a schema with the required CDC metadata columns
// Validate that the connector returned a schema with the required CDC metadata columns


object ChangelogTable {

def validateSchema(cl: Changelog): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider scoping to private[v2]: ChangelogTable is documented as NOT a connector API surface, and validateSchema is only called from the primary constructor. Keeping it public invites external callers to bypass or double-invoke the validation.

cl.containsIntermediateChanges()
if (needsRowId && (rowIds == null || rowIds.isEmpty)) {
throw QueryCompilationErrors.changelogMissingRowIdError(cl.name)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rowId columns are not checked for non-nullability, even though (a) the Changelog.rowId() Javadoc requires "Each referenced column must be non-nullable", and (b) the peer row-level-operations path validates this via RewriteRowLevelCommand.resolveRowIdAttrs with NULLABLE_ROW_ID_ATTRIBUTES. Consider adding a parallel NULLABLE_ROW_ID sub-class (or at least stating explicitly that rowId column validation is deferred to a later PR). As written, rowVersion gets nullability + top-level-ness but rowId gets presence only.

// delete+insert pair would be misclassified as a real update).
rowVersionRef.foreach { ref =>
val fieldNames = ref.fieldNames()
if (fieldNames.length != 1) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The top-level requirement is new — the Changelog.rowVersion() Javadoc only says "non-nullable". A connector that reads the contract and returns a nested NamedReference will fail with NESTED_ROW_VERSION but get no hint from the API docs. Please either (a) update the Changelog.rowVersion() Javadoc to state that the reference must be a top-level column of columns(), or (b) remove this check and accept nested references. Same applies (by extension) to rowId if top-level-ness is also intended there.

}
val columnName = fieldNames(0)
val col = byName.getOrElse(columnName,
throw QueryCompilationErrors.changelogMissingColumnError(cl.name, columnName))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When rowVersion() points to a column that isn't in columns(), we throw MISSING_COLUMN with the referenced column name — the user sees "Required column <x> is missing", as if a required metadata column were missing. The real cause is "rowVersion references a column not in the schema." Consider a dedicated sub-class (e.g. ROW_VERSION_COLUMN_NOT_FOUND) or at least a reworded message. This path also has no test.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three sub-classes introduced in this PR have no test case: MISSING_ROW_ID, MISSING_ROW_VERSION, NESTED_ROW_VERSION. Capability triggers other than containsCarryoverRows=true are also untested (no case exercising representsUpdateAsDeleteAndInsert=true or containsIntermediateChanges=true, and no case with a rowVersion referencing a column that is absent from columns()). The TestChangelog fixture already supports these — a few small cases would close the coverage gap the PR description claims is already covered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants