feat(sync): multi-block pre_confirmed chain, poller + storage#3723
feat(sync): multi-block pre_confirmed chain, poller + storage#3723EgeCaner wants to merge 19 commits into
Conversation
| func (c *ChainReader) OldestFirst() iter.Seq[*pending.PreConfirmed] { | ||
| return func(yield func(*pending.PreConfirmed) bool) { | ||
| if c == nil { | ||
| return | ||
| } | ||
| walkOldestFirst(c.head, c.length, yield) | ||
| } | ||
| } |
There was a problem hiding this comment.
Nit (unbounded recursion on unsafe snapshots). The doc here and on walkOldestFirst justifies the recursion depth as "at most BlockHashLag", but that bound only holds for head-aligned views produced by SnapshotForHead. UnsafeSnapshot returns the full, uncapped chain (length = however far the sequencer has run ahead). OldestFirst / PendingStateAt / PendingStateBeforeIndexAt are exported and operate on any *ChainReader, so if a consumer ever iterates an unsafe snapshot the recursion depth equals the raw chain length, not BlockHashLag.
In practice the gap is small and this PR doesn't wire consumers yet, so it's not a live bug — but since the next PR adds consumers, consider either iterating non-recursively or asserting/documenting that these methods must only be called on SnapshotForHead views.
| func (p *Poller) backfill( | ||
| ctx context.Context, | ||
| head *core.Header, | ||
| fromBlock uint64, | ||
| identifier string, | ||
| txCount uint64, | ||
| endExclusive uint64, | ||
| ) error { | ||
| update, err := p.dataSource.PreConfirmedBlockByNumber(ctx, fromBlock, identifier, txCount) | ||
| if err != nil { | ||
| return fmt.Errorf("polling pre_confirmed by number %d: %w", fromBlock, err) | ||
| } | ||
| if err := p.apply(update, fromBlock, txCount, head); err != nil { | ||
| return fmt.Errorf("backfilling pre_confirmed at %d: %w", fromBlock, err) | ||
| } | ||
| for n := fromBlock + 1; n < endExclusive; n++ { | ||
| update, err := p.dataSource.PreConfirmedBlockByNumber(ctx, n, "", 0) | ||
| if err != nil { | ||
| return fmt.Errorf("polling pre_confirmed by number %d: %w", n, err) | ||
| } | ||
| if err := p.apply(update, n, 0, head); err != nil { | ||
| return fmt.Errorf("backfilling pre_confirmed at %d: %w", n, err) | ||
| } | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Nit (partial-progress re-publish). Each apply inside backfill calls p.out.Send(applied) as it goes. If a later iteration of the backfill loop (or the final apply in tick) errors out, the entries already applied this tick stay committed to storage and have already been published on the feed. The next tick re-advances, re-snapshots (now with mostRecent at the last successfully-applied block) and resumes — which is the right recovery behavior, but a block whose apply succeeds on tick N and a re-poll on tick N+1 can be published twice.
That's fine if feed consumers are idempotent on (block_number, identifier, tx_count). Worth a one-line comment here making that contract explicit, since downstream consumers land in the follow-up PR.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3723 +/- ##
==========================================
+ Coverage 75.27% 75.49% +0.21%
==========================================
Files 438 440 +2
Lines 39536 39844 +308
==========================================
+ Hits 29761 30079 +318
+ Misses 7701 7672 -29
- Partials 2074 2093 +19 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
Claude finished @EgeCaner's task in 4m 5s —— View job Claude PR Review
Reviewed the multi-block What I independently verified
Known nits (already filed — I concur, nothing blocking)
One thing to add to that idempotency contractNote that Nice work — the storage abstraction is clean and the doc comments make the invariants easy to verify. |
| // newer than it, so concurrent readers walking a prior snapshot see a stable | ||
| // graph. Popped nodes become unreferenced and GC-collectable. | ||
| type node struct { | ||
| pc *pending.PreConfirmed |
There was a problem hiding this comment.
Let's use descriptive name, pc can be many things. preconfirmed is a good one
There was a problem hiding this comment.
changed as preconfirmed
| if c == nil { | ||
| return | ||
| } |
There was a problem hiding this comment.
Why would c be nil, which scenario is this? Why would you call this on a nil pointer?
There was a problem hiding this comment.
removed the nil check on receiver
c375f1f to
b5d58ae
Compare
|
Claude finished @EgeCaner's task in 3m 57s —— View job Claude PR Review
Re-reviewed the latest commits ( What I re-verified on this push
Open nits (already filed — I concur, nothing blocking)
On @rodrodros's review threads
Nice work — the storage abstraction is clean and the invariants are easy to verify from the doc comments. |
|
Claude finished @EgeCaner's task in 2m 49s —— View job Re-review — multi-block
|
|
Claude finished @EgeCaner's task in 3m 9s —— View job Re-review — multi-block
|
8bd2e59 to
dc99137
Compare
8760fd6 to
a97fa17
Compare
|
Claude finished @EgeCaner's task in 2m 29s —— View job Re-review — multi-block
|
dc99137 to
02e0259
Compare
a97fa17 to
fb605cd
Compare
|
Claude finished @EgeCaner's task in 2m 33s —— View job Re-review — multi-block
|
thiagodeev
left a comment
There was a problem hiding this comment.
Overall, LGTM.
I will not approve it since it's not supposed to be merged as it is (as you said, the changes must be integrated). Also, there are some points missing that I will only be able to properly review in the full PR integrating it
I left a few comments
| var peek struct { | ||
| Changed *bool `json:"changed"` | ||
| Timestamp *uint64 `json:"timestamp"` | ||
| Changed *bool `json:"changed"` | ||
| Timestamp *uint64 `json:"timestamp"` | ||
| BlockNumber *uint64 `json:"block_number"` | ||
| } |
There was a problem hiding this comment.
Suggestion: we're unmarshalling the entire block twice here only to know the update type, but if we pay attention, all fields from Delta and all the fields from peek (except 'blockNumber') are present in the Full response. WDYT of changing peek to be PreConfirmedBlock + BlockNumber, so that we unmarshal once and populate eveything with it?
| var peek struct { | |
| Changed *bool `json:"changed"` | |
| Timestamp *uint64 `json:"timestamp"` | |
| Changed *bool `json:"changed"` | |
| Timestamp *uint64 `json:"timestamp"` | |
| BlockNumber *uint64 `json:"block_number"` | |
| } | |
| var peek struct { | |
| PreConfirmedBlock | |
| BlockNumber *uint64 `json:"block_number"` | |
| } |
There was a problem hiding this comment.
Pre-confirmed decode benchmark
Summary: Decoding the stream directly via DecodePreConfirmedUpdate (no UnmarshalJSON) parses the payload once vs. one extra full re-scan for a single-pass UnmarshalJSON, and two for the original peek-then-re-decode. Each removed scan roughly halves decode time (full_big: 923µs → 656µs → 383µs; ~2.4× faster overall), at identical allocations.
Variants:
- original — pre-refactor UnmarshalJSON that peeks for the discriminator, then re-decodes into the concrete variant (2 full byte-scans).
- option1 — conventional UnmarshalJSON that decodes once into the flat wire struct and discriminates structurally (1 re-scan).
- current — DecodePreConfirmedUpdate, decodes the stream directly with no UnmarshalJSON (0 re-scans).
BenchmarkPreConfirmedDecodeVariants/full/original_unmarshaler-14 7168 165346 ns/op 44389 B/op 324 allocs/op
BenchmarkPreConfirmedDecodeVariants/full/option1_unmarshaler-14 9933 116690 ns/op 44160 B/op 316 allocs/op
BenchmarkPreConfirmedDecodeVariants/full/current_directDecode-14 17205 69790 ns/op 43865 B/op 310 allocs/op
BenchmarkPreConfirmedDecodeVariants/full_big/original_unmarshaler-14 1292 923323 ns/op 318737 B/op 1453 allocs/op
BenchmarkPreConfirmedDecodeVariants/full_big/option1_unmarshaler-14 1815 655790 ns/op 318513 B/op 1445 allocs/op
BenchmarkPreConfirmedDecodeVariants/full_big/current_directDecode-14 3040 383155 ns/op 318232 B/op 1439 allocs/op
BenchmarkPreConfirmedDecodeVariants/delta/original_unmarshaler-14 21176 56646 ns/op 19720 B/op 131 allocs/op
BenchmarkPreConfirmedDecodeVariants/delta/option1_unmarshaler-14 29802 40317 ns/op 19600 B/op 124 allocs/op
BenchmarkPreConfirmedDecodeVariants/delta/current_directDecode-14 48848 24343 ns/op 19304 B/op 118 allocs/op
BenchmarkPreConfirmedDecodeVariants/no_change/original_unmarshaler-14 2178157 547.8 ns/op 1168 B/op 12 allocs/op
BenchmarkPreConfirmedDecodeVariants/no_change/option1_unmarshaler-14 2086216 572.6 ns/op 1336 B/op 12 allocs/op
BenchmarkPreConfirmedDecodeVariants/no_change/current_directDecode-14 3116310 384.7 ns/op 1160 B/op 9 allocs/op| if target == nil { | ||
| return nil, pending.ErrPreConfirmedNotFound | ||
| } |
There was a problem hiding this comment.
This check is unecessary, correct? We already know that c.lenth is > 0, and that the block number is between bottom and head block number; target should never be nil. The only case would be if we stored a nil preconfirmed in the chain, but that would be a bug, right?
There was a problem hiding this comment.
You're right, any block passing the [bottom, head] check is guaranteed present in a contiguous chain, so a nil target only means a gap, i.e. a bug. I kept the check and turned it into an invariant check and updated the error accordingly.
| want := int(viewTip - wantBottom + 1) | ||
| if viewHead == c.head && want == c.length { | ||
| return *c | ||
| } | ||
| return ChainReader{head: viewHead, length: want} |
There was a problem hiding this comment.
WDYT? We are returning by value anyway; same bytes size. Measuring both, I vote on remove this check to simplify a little bit the logic
| want := int(viewTip - wantBottom + 1) | |
| if viewHead == c.head && want == c.length { | |
| return *c | |
| } | |
| return ChainReader{head: viewHead, length: want} | |
| return ChainReader{head: viewHead, length: int(viewTip - wantBottom + 1)} |
| require.Equal(t, "PRE_CONFIRMED", full.Status) | ||
| require.NotZero(t, full.Timestamp) | ||
| require.NotNil(t, full.SequencerAddress) |
There was a problem hiding this comment.
Nitpick: here and in the other parts, we could use assert instead of required
| require.True(t, ok, "expected PreConfirmedBlock, got %T", env.Update) | ||
| require.Equal(t, "0x1cbe25d9", full.BlockIdentifier) | ||
| require.Equal(t, "PRE_CONFIRMED", full.Status) | ||
| require.NotZero(t, full.Timestamp) |
There was a problem hiding this comment.
Not related to your changes, but could you please add the timestamp validation for the PreConfirmedBlock.validate() method? I forgot to add in my PR.
Also, maybe we could use the PreConfirmedUpdateEnvelope.Validate() method here instead of all these manual assertions. Do as you prefer
There was a problem hiding this comment.
Added timestamp must be non-zero check to full preconfirmed Validate. I prefer to keep the manual checks here because they are asserting for exact value, Validate checks the structure
| type PreConfirmedUpdateEnvelope struct { | ||
| Update PreConfirmedUpdate | ||
| Update PreConfirmedUpdate | ||
| BlockNumber uint64 |
There was a problem hiding this comment.
The new Validate() feat was implemented in main and included on this PR, but this PR introduces the new BlockNumber field, which is not being validated. However, it's hard to do so since the same type is used for both 'latest' and 'blockNumber' queries on the feeder 'get_preconfirmed_endpoint'. WDYT about adding a new type specific for the 'latest' call, use it in the feeder endpoint validation, and before returning it, cast it to the PreConfirmedUpdateEnvelope type?
This is just one way of doing it, but the main point is that we should validate this as well somehow
fb605cd to
30ab42e
Compare
|
Claude finished @rodrodros's task in 1m 10s —— View job Re-review — multi-block
|
30ab42e to
6760788
Compare
|
Claude finished @EgeCaner's task in 2m 45s —— View job Re-review — multi-block
|
81ec31c to
b98f7fe
Compare
|
Claude finished @EgeCaner's task in 1m 19s —— View job Re-review — multi-block
|
| // preConfirmedWire is the flat shape the decoder fills. Discrimination is structural: | ||
| // - "changed": false → NoChange | ||
| // - "changed": true + "timestamp" → Full (new round) | ||
| // - "changed": true + "timestamp" → Full block (new round) | ||
| // - "changed": true, no "timestamp" → Delta | ||
| type PreConfirmedUpdateEnvelope struct { | ||
| Update PreConfirmedUpdate | ||
| type preConfirmedWire struct { | ||
| Changed *bool `json:"changed"` | ||
| BlockNumber *uint64 `json:"block_number"` | ||
| PreConfirmedBlock | ||
| } |
There was a problem hiding this comment.
The type is only used in the below function, why not use it inside it?
| return errors.New("version is required") | ||
| } | ||
| if pb.Timestamp == 0 { | ||
| return errors.New("timestamp is required") |
There was a problem hiding this comment.
We need a small validation test for this one
| // preConfirmedWire is the flat shape the decoder fills. Discrimination is structural: | ||
| // - "changed": false → NoChange | ||
| // - "changed": true + "timestamp" → Full (new round) | ||
| // - "changed": true + "timestamp" → Full block (new round) | ||
| // - "changed": true, no "timestamp" → Delta |
There was a problem hiding this comment.
Thanks for writing thoughtful comments.
| return err | ||
| if raw.Changed == nil { | ||
| return PreConfirmedUpdateEnvelope{}, errors.New( | ||
| "pre_confirmed update: missing required \"changed\" field", |
There was a problem hiding this comment.
I think the error can be just "missing required ..."
| var head *node | ||
| for i, pc := range entries { | ||
| if pc == nil { | ||
| return ChainReader{}, fmt.Errorf("building pre_confirmed chain: entry %d is nil", i) |
There was a problem hiding this comment.
This error can start directly from "entry..."
| } | ||
| if i > 0 && pc.Block.Number != entries[i-1].Block.Number+1 { | ||
| return ChainReader{}, fmt.Errorf( | ||
| "building pre_confirmed chain: non-contiguous block numbers at index %d (%d after %d)", |
| n := c.head | ||
| for i := 0; i < c.length && n != nil; i++ { | ||
| if !yield(n.preconfirmed) { | ||
| return | ||
| } | ||
| n = n.parent |
There was a problem hiding this comment.
no single letter names, let's reserve those for method receivers
| for entry := range c.NewestFirst() { | ||
| for _, tx := range entry.Block.Transactions { | ||
| if tx.Hash().Equal(hash) { | ||
| return tx, nil | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Idea for later, have some sort of cache that points you were to look a transaction hash 🤔 and transaction receipt but it might be unnecessary
b98f7fe to
f1271db
Compare
|
Claude finished @EgeCaner's task in 2m 15s —— View job Re-review — multi-block
|
…uctor, add txn lookup tests
f1271db to
45df31b
Compare

Summary
This PR implements poller and storage in order to support multiple preconfirmed. Currently multi preconfirmed logic is not wired to production. Following PR will wire it to production and will remove prelatest, and candidate transactions.
PreConfirmedBlockLatestto the feeder client and propagate it throughstarknetdata.StarknetDataandsync.DataSource. This hits the sequencer's"latest" variant of
get_preconfirmed_block, whose response carries atop-level
block_numberso the caller can discover the server'spre_confirmed tip without tracking the height itself. The shared
PreConfirmedUpdateEnvelopenow decodes that field.sync/preconfirmedpackage with two pieces:ChainStorage— an immutable, lock-free linked list of pre_confirmedblocks anchored above the canonical head. Single writer (the poller) /
many concurrent readers via
atomic.Pointer. ExposesSnapshotForHead(bounded by
BlockHashLag) for consumers,UnsafeSnapshotfor thepoller, and
ApplyUpdate/AdvanceTofor chain evolution. Handlesbootstrap, extension, in-chain replacement (Delta or new round), and
realignment when head moves or reverts.
Poller— single-goroutine tick loop: advance storage to head,poll the server's
latest, backfill any gap below it (re-polling theprevious tip with delta hints to capture its final view), then apply
the latest update. NoChange / Delta / Full variants all dispatched.