Orphan bead handling wrt DB #309
Conversation
mcelrath
left a comment
There was a problem hiding this comment.
This PR creates a circular dependency, Braid <-> DBHandler.
In main.rs:517 around AddBeadStatus::BeadAdded we are writing new beads to the DB on a successful extend (which will automatically also add orphans to the braid if possible). So all we really need to do is update main.rs to save not only the new bead, but instead all new beads in the braid since the last save.
For instance, Before calling braid_data.extend(&bead) at main.rs:477, save let braid_prev_len = braid_data.beads.len() and then when writing to the DB, write all beads between braid_prev_len and the new braid_data.beads.len().
There was a problem hiding this comment.
Pull request overview
This PR addresses orphan bead handling and database persistence by decoupling the Braid structure from direct database operations. The changes simplify the architecture by removing the ibd_manager module and moving Initial Block Download (IBD) logic into the peer_manager, while implementing proper orphan bead persistence when all parents become available.
Key changes include:
- Removal of
ibd_managermodule and migration of IBD state management topeer_manager - Decoupling
SwarmHandlerfromBraidto reduce lock contention and architectural complexity - Implementation of orphan bead detection and persistence by tracking braid height changes during extension
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 18 comments.
Show a summary per file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
mcelrath
left a comment
There was a problem hiding this comment.
Overall, this seems overly complex to me. I don't understand why prepare_bead_tuple_data takes so many arguments, its job is just to make json tuples for insert. Why does it take so many arguments?
Likewise InsertBeadSequentially should probably just be called InsertBeads and it should take a Vec<Bead>. Where they need to get things like BeadIdx, why just not pass in a reference to the Braid?
92717d9 to
35ca7bc
Compare
493d099 to
a35e079
Compare
a35e079 to
64690f7
Compare
64690f7 to
3ea09ef
Compare
zaidmstrr
left a comment
There was a problem hiding this comment.
I reviewed this PR and found some areas of improvement and gave some suggestions. You are free to discuss them below.
fdc819d to
633c8fc
Compare
633c8fc to
e0109fc
Compare
5d2e2bd to
c17b3e7
Compare
ec8668f to
e9b1596
Compare
b754a6b to
9116335
Compare
…hecks into peer manager module changing insert bead function signature changing verbosity of ibd logs
…rebase removing typo removing not needed file
chore: rebasing over dev resolving conflicts in lockfile
…ntinue in db_handlers Currently the buffers being kept for ibd should be drained before retry and replaced at each retry and similarly if the ibd gets completed the in-memory data can be dropped. Removing the continue branches from db_handlers in place of concrete fallible error types in case any of the persistance mapping transaction is rollbacked or in case of error preventing inconsistent and unnecessary further iteration of remaining beads in case of both orphan and non-orphand beads.
…ms,logs This commit removes the Arc<Mutex<T>> locking around Connection Pool from sqlx since the inner type is already Arc<T> which further is wrapped at granular level to provide interior mutability thus is both Send+Sync and it removes the logging loop in batch insertion which was being used for testing and adding the check for duplicate bead that are received during the floodsub propagation
… in case of failure or timeout This adds a ibd_complete atomic flag share between stratum server in main swarm loop that will prevent the mining connections from downstream miners until IBD is completed. It also modifies peer_manager module to add request monitoring in case of timeout or in-between peer is disconnected .
…nsertions This removes the holding of locks in bead_sync loop and reducing the contention between locks, along with that it also improves upon the existing bulk insertion method in which if the threshold of BATCH_SIZE is reached then it chunks into min batch size and insert each chunk. It also changes the prototyping of value serialization function in db_handlers.rs to reduce the round-trip between serde and concrete value and removing unused params of functions such as in SwarmHandler and removing oneshot for sending intermediate data to peer manager required for bead_sync.
… overhead We already had batches based bead fetching on the basis of beadhashes, but the beadhashes were not batched leading to potential memory overhead during the initial `GetBeadsAfter` call. In this we limit the call of beadhashes to batch size and changes in `Codec` implementation to take the buffer instead of usize::MAX to the framesize to limit the req/resp call. chore(ibd): Redoing some comments and updating test in db handler
9116335 to
7832048
Compare
zaidmstrr
left a comment
There was a problem hiding this comment.
I found some possible issues that needs to taken a look. While the reviewing process is still going on from my side. Until then you can look at the issues I have found.
|
|
||
| /// Record an in-flight IBD request to `peer_id`. Called immediately after | ||
| /// `bead_sync.send_request(...)` returns its `OutboundRequestId`. | ||
| pub fn set_ibd_inflight(&mut self, peer_id: PeerId, request_id: OutboundRequestId) { |
There was a problem hiding this comment.
The function has no guard to hold the correct invariant. Thus, can include the possible race conditions. Also, this is called in many places inside main.rs under independent code blocks.
| // Try to retrieve the parent | ||
| //This is not required if a bead exists in DB it would already been extended to local braid as well | ||
| // Parent not found and can't be retrieved | ||
| self.orphan_beads.push(bead.clone()); |
There was a problem hiding this comment.
The orphan_beads Vec is an unbounded buffer and has no checks on the capacity limits. Any malicious peer can send any number of malformed beads with unknown parents and can crash the buffer with OOM.
| let mut ibd_initiated = false; | ||
|
|
||
| // Flag to indicate IBD completion - prevents miner connections until synced | ||
| let ibd_complete = Arc::new(AtomicBool::new(false)); |
There was a problem hiding this comment.
The variable is never set to false after being used or setting it to true. Which can introduce unexpected behaviour later in the code.
| // Trigger IBD when peer count threshold is reached | ||
| { | ||
| let peer_manager = peer_manager_arc.write().await; | ||
| if !ibd_initiated && peer_manager.num_connected_peers() >= MIN_PEERS_FOR_IBD { | ||
| info!( | ||
| peer_count = peer_manager.num_connected_peers(), | ||
| threshold = MIN_PEERS_FOR_IBD, | ||
| "Peer threshold reached, initiating IBD" | ||
| ); | ||
| match swarm_command_sender.send(SwarmCommand::InitiateIBD).await { | ||
| Ok(_) => { | ||
| ibd_initiated = true; | ||
| info!("IBD trigger sent based on peer count"); | ||
| } | ||
| Err(error) => { | ||
| error!(error=?error, "Failed to send IBD initiation command"); | ||
| } | ||
| } | ||
| } | ||
| } |
| let next_batch_offset = match peer_manager_arc | ||
| .write() | ||
| .await | ||
| .next_batch_offset(peer, IBD_BATCH_SIZE) | ||
| { |
| let req_id = swarm.behaviour_mut().request_beads(peer, &pruned_beads[next_batch_offset..(next_batch_offset+IBD_BATCH_SIZE)].to_vec()); | ||
| peer_manager_arc.write().await.set_ibd_inflight(peer, req_id); |
| let req_id = swarm.behaviour_mut().request_beads(peer, &pruned_beads[next_batch_offset..].to_vec()); | ||
| peer_manager_arc.write().await.set_ibd_inflight(peer, req_id); |
| // Initiate `GetBead` request cycle before taking the peer_manager lock, | ||
| // then batch handle_update_incoming + set_ibd_inflight into one acquire. | ||
| let req_id = if pruned.len() <= IBD_BATCH_SIZE { | ||
| swarm.behaviour_mut().request_beads(peer, &pruned) | ||
| } else { | ||
| swarm.behaviour_mut().request_beads(peer, &pruned[0..IBD_BATCH_SIZE].to_vec()) | ||
| }; | ||
| // Initiating `GetBead` request cycle | ||
| if pruned_ref.len() <= IBD_BATCH_SIZE{ | ||
| swarm.behaviour_mut().request_beads(peer, &pruned_ref); | ||
| } | ||
| else{ | ||
| swarm.behaviour_mut().request_beads(peer, &pruned_ref[0..IBD_BATCH_SIZE].to_vec()); | ||
| { | ||
| let mut peer_manager = peer_manager_arc.write().await; | ||
| peer_manager.handle_update_incoming(peer, pruned); | ||
| peer_manager.set_ibd_inflight(peer, req_id); |
| let mut peer_manager = peer_manager_arc.write().await; | ||
| let active = peer_manager.take_ibd_inflight_if_matches(&peer, request_id); | ||
| if active { | ||
| peer_manager.handle_update_retry_count(peer); |
| let mut peer_manager = peer_manager_arc.write().await; | ||
| let was_ibd = peer_manager.take_ibd_inflight_for_peer(&peer_id); | ||
| if was_ibd { | ||
| peer_manager.handle_update_retry_count(peer_id); |
| pub fn load_from_config_file(path: &str) -> Result<BraidpoolConfig, ConfigError> { | ||
| let contents = fs::read_to_string(path).map_err(|e| ConfigError::TomlParseError { | ||
| error: e.to_string(), | ||
| path: path.to_string(), | ||
| })?; |
| { | ||
| let mut peer_manager = peer_manager_arc.write().await; | ||
| peer_manager.handle_update_ibd_peer_tips(peer, tips.0); | ||
| peer_manager.set_ibd_inflight(peer, req_id); | ||
| } |
| if retry_cnt > 0 { | ||
| peer_manager.handle_update_retry_count(lowest_latency_peer); | ||
| } |
Tests binding fixed ports (3353, 3356, 3357, 3358, 5050) caused AddrInUse failures when tests ran concurrently or a previous test left a server listening. This was a known flake documented across multiple PRs (braidpool#309, braidpool#474). Add bound_addr_tx: Option<oneshot::Sender<SocketAddr>> to run_stratum_service. After binding, the actual address is sent through the channel so tests can connect to the real port. All 5 tests now use port 0 (OS-assigned) and await the oneshot receiver before connecting, no sleep needed, no race window. Production call in main.rs passes None.
Tests binding fixed ports (3353, 3356, 3357, 3358, 5050) caused AddrInUse failures when tests ran concurrently or a previous test left a server listening. This was a known flake documented across multiple PRs (braidpool#309, braidpool#474). Add bound_addr_tx: Option<oneshot::Sender<SocketAddr>> to run_stratum_service. After binding, the actual address is sent through the channel so tests can connect to the real port. All 5 tests now use port 0 (OS-assigned) and await the oneshot receiver before connecting, no sleep needed, no race window. Production call in main.rs passes None.
…477) * fix(tests): replace hardcoded stratum ports with OS-assigned port 0 Tests binding fixed ports (3353, 3356, 3357, 3358, 5050) caused AddrInUse failures when tests ran concurrently or a previous test left a server listening. This was a known flake documented across multiple PRs (#309, #474). Add bound_addr_tx: Option<oneshot::Sender<SocketAddr>> to run_stratum_service. After binding, the actual address is sent through the channel so tests can connect to the real port. All 5 tests now use port 0 (OS-assigned) and await the oneshot receiver before connecting, no sleep needed, no race window. Production call in main.rs passes None. * fix: use actual bound port in stratum endpoint logging server_endpoints and the warning log were using self.stratum_config.port which shows 0 when OS port assignment is used. Switch to bound_addr.port() so logs always show the real listening port.

orphanbeads were not being persisted in DB leading to foreign key violation and causing error in subsequent bead propagation .Braidwith thedb_handlerusingdb_txand thus persisting the beads if any present inorphan bufferas soon as all of its parents have been received .BeadSyncfor the beads whose parents have not yet been received i.e. it is orphan bead which was redundant in nature causing unnecessary utilization of bandwidth .The above was initial though but over the discussions me @mcelrath arrived at a conclusion that tight coupling b/w DB and
Braidis not required and must be kept as separated as possible .Braidinto main thread only reducing the causes as well as checking of potential race conditions arising eventually leading to deadlocks.Timestampbased checking forIBDfor the purpose of connecting DAG after receiving beads from sync nodes as explained under Refactor IBD #306 andibd_managermoving intopeer_manager.stratumcomponent instead depending dynamically overOrphan-Occupancyabove certain thresholdalphadefined as latency param which will eventually done in forth-coming PR and adding batched insertions during IBD.Arc<Mutex<T>>wrapping around db connection pool to avoid unnecessary locking along with the clearing IBD buffers in case of stale or dropped and retried requests and clearing buffer after ibd is completed.