You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When a cluster leader changes (failover or re-election), a catch-up subscription would silently stop delivering events with no onCancelled callback — the application was never told the subscription had dropped, so any reconnect logic built on onCancelled never ran and the subscription stayed dead until the process was restarted.
This hits the default configuration: nodePreference=leader is the client default, so subscriptions request the leader out of the box and are exposed on any leader change. Persistent subscriptions could hit the same silent failure.
Cause
On a leader change the server tells the client "I'm no longer the leader, reconnect here". The client tried to record that new leader but threw a NullPointerException internally, because the subscription path never set up the connection context the read path already uses. The NPE was thrown beforeonCancelled ran, so the error was swallowed instead of being reported.
Fix
Subscriptions now set up that connection context, so a leader change is handled exactly like reads already handle it: the client re-points to the new leader and onCancelled is always invoked with a NotLeaderException.
Hardened both the catch-up and persistent subscription error paths so a failure while handling the redirect can never again suppress onCancelled.
• Ensure catch-up subscriptions capture connection args for leader redirect handling.
• Harden leader-redirect error handling so onCancelled always fires.
• Add regression test covering NotLeader redirect without observer args set.
Diagram
graph TD
A["AbstractRegularSubscription"] --> B["GrpcClient.runWithArgs"] --> C["StreamsGrpc stub"] --> D["ReadResponseObserver"] --> E["SubscriptionStreamConsumer"] --> F["SubscriptionListener (app)"]
G{{"gRPC server"}} --> D
D --> H["NotLeaderException"] --> E
Loading
High-Level Assessment
The following are alternative approaches to this PR:
1. Null-object args (always non-null)
➕ Eliminates null checks throughout observer code
➕ Makes it harder for future call sites to forget onConnected
➖ Can hide lifecycle bugs where args should be set but isn't
➖ Requires defining safe semantics for reportNewLeader when not connected
2. Move leader-redirect handling to a gRPC interceptor / client layer
➕ Centralizes redirect parsing and leader reporting across all call types
➕ Keeps observers focused on stream consumption
➖ More invasive refactor; higher risk for a targeted regression fix
➖ May be harder to test without broader integration scaffolding
Recommendation: Current approach is a good minimal, low-risk fix: (1) align subscription execution with the read path by using runWithArgs and calling observer.onConnected(args), and (2) make onError robust so onCancelled is still delivered even if leader parsing/reporting fails. Consider the null-object args approach only if more call paths continue to miss onConnected in the future.
Files changed (4) +68 / -7
Bug fix (2) +12 / -6
AbstractRegularSubscription.javaInitialize observer with connection args for subscriptions+4/-3
Initialize observer with connection args for subscriptions
• Switch subscription execution to GrpcClient.runWithArgs, build the stub from args.getChannel(), and call observer.onConnected(args) before starting the stream. This ensures subscription observers have args populated so leader redirects can be handled consistently.
ReadResponseObserver.javaGuard leader-redirect handling against null args and parsing failures+8/-3
Guard leader-redirect handling against null args and parsing failures
• Wrap leader redirect handling in try/catch and null-check args before calling reportNewLeader. This prevents exceptions in onError from short-circuiting cancellation propagation to consumers.
LeaderRedirectUnitTest.javaAdd regression test for leader redirect without observer args+55/-0
Add regression test for leader redirect without observer args
• Adds a unit test that constructs ReadResponseObserver without calling onConnected (args remains null) and asserts a leader-redirect onError does not throw and still triggers listener onCancelled with a NotLeaderException.
The reason will be displayed to describe this comment to others. Learn more.
@w1am 👉 Created pull request targeting release/v1.2: #397
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
When a cluster leader changes (failover or re-election), a catch-up subscription would silently stop delivering events with no
onCancelledcallback — the application was never told the subscription had dropped, so any reconnect logic built ononCancellednever ran and the subscription stayed dead until the process was restarted.This hits the default configuration:
nodePreference=leaderis the client default, so subscriptions request the leader out of the box and are exposed on any leader change. Persistent subscriptions could hit the same silent failure.Cause
On a leader change the server tells the client "I'm no longer the leader, reconnect here". The client tried to record that new leader but threw a
NullPointerExceptioninternally, because the subscription path never set up the connection context the read path already uses. The NPE was thrown beforeonCancelledran, so the error was swallowed instead of being reported.Fix
onCancelledis always invoked with aNotLeaderException.onCancelled.Closes DEV-1817