Streaming exchange by blp · Pull Request #6014 · feldera/feldera · GitHub
Skip to content

Streaming exchange#6014

Open
blp wants to merge 29 commits intomainfrom
exchange
Open

Streaming exchange#6014
blp wants to merge 29 commits intomainfrom
exchange

Conversation

@blp
Copy link
Copy Markdown
Member

@blp blp commented Apr 9, 2026

Please read the individual commits.

I'm currently working to figure out why one test fails (see final commit). That's why this is being submitted as a draft PR. After I fix it, I'll mark it ready for review.

@blp blp requested a review from ryzhyk April 9, 2026 22:27
@blp blp self-assigned this Apr 9, 2026
@blp blp added DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation performance rust Pull requests that update Rust code multihost Related to multihost or distributed pipelines labels Apr 9, 2026
@blp blp marked this pull request as draft April 9, 2026 22:28
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Draft-level comment: this is getting hard to reason about as one unit because the core semantic change (streaming exchange / shard-accumulate) is mixed with a long tail of preparatory cleanups, instrumentation tweaks, and exchange refactors. Before this is ready, I would strongly consider splitting it into a stack: prep refactors/tests first, then the streaming-exchange semantic change, then shard-accumulate on top. That keeps the design review focused on the actual behavioral change instead of making the reviewer re-prove every cleanup commit at the same time.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 9, 2026

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 9, 2026

@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days.

@ryzhyk
Copy link
Copy Markdown
Contributor

ryzhyk commented Apr 12, 2026

@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days.

That test fails for me even if I comment out accumulate_integrate_trace_retain_values_top_n. The failure doesn't seem related to lateness and bounds. I haven't yet figured out what's going on here.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 13, 2026

That test fails for me even if I comment out accumulate_integrate_trace_retain_values_top_n. The failure doesn't seem related to lateness and bounds. I haven't yet figured out what's going on here.

I'll take another look, then.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 14, 2026

I believe I've found the underlying cause of the test failures.

The test in question does an aggregation of two different streams and compares them:

  • The first stream is pre-sharded because it comes from add_input_indexed_zset, which has an internal call to dyn_shard. Thus, when aggregate internally calls dyn_shard_accumulate, it accumulates the sharded stream via dyn_accumulate().
  • The second stream is the output of the map_index() operator; while it is inherently sharded in this case, that is not visible to map_index() and thus it is not marked as sharded. When aggregate internally calls dyn_shard_accumulate, it uses streaming exchange to shard and then accumulate.

Since the two aggregations are over different source streams, and those source streams are not synchronized (streaming exchange runs at an unpredictable rate), sometimes they get out of sync and cause a failure.

The solution is not obvious to me.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 14, 2026

I pushed a new final commit that demonstrates the issue in more detail.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 14, 2026

I fixed the problem (thanks @ryzhyk)

@blp blp marked this pull request as ready for review April 14, 2026 22:27
Copy link
Copy Markdown

@mythical-fred mythical-fred left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One blocker.

The branch is now ready for review, so the commit history has to be cleaned up before merge. There are still at least two hard-blocking commit subjects in the stack: remove integrals and shard_accumulate: wip. Both violate the commit-message rule, and the latter is explicitly WIP history. Please squash/reword the branch into clean, mergeable commits before asking for final review.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 15, 2026

The branch is now ready for review, so the commit history has to be cleaned up before merge. There are still at least two hard-blocking commit subjects in the stack: remove integrals and shard_accumulate: wip. Both violate the commit-message rule, and the latter is explicitly WIP history. Please squash/reword the branch into clean, mergeable commits before asking for final review.

Those are good suggestions but, if we are to implement them, @ryzhyk is probably the right person to do it.

Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main feedback we discussed offline: it make sense to merge Accumulator into ExchangeReceiver.

{
type Output = (T::Output, Duration);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very clever. I didn't know how to do this. Could you add a comment explaining how this works?

Copy link
Copy Markdown
Member Author

@blp blp Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I added this comment on Timed<T>:

    /// An async task that measures its runtime.
    ///
    /// This uses the same wrapper technique as [tokio_metrics::Instrumented] or
    /// [tracing::Instrument]: by implementing [Future] manually, it wraps each
    /// poll with elapsed time measurement.  When the future eventually
    /// completes, it outputs the wrapped future's output value plus the total
    /// elapsed time.
    ///
    /// [tokio_metrics::Instrumented]: https://docs.rs/tokio-metrics/latest/tokio_metrics/struct.Instrumented.html
    /// [tracing::Instrument]: https://docs.rs/tracing/latest/tracing/trait.Instrument.html

fn fixedpoint(&self, _scope: Scope) -> bool {
false
}
fn is_input(&self) -> bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good fix. This feature is currently only used by the string interner.

where
F: FnMut(T),
{
let receiver = Runtime::worker_index();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this was an optimization to avoid accessing thread-locals on each call. But maybe it's not an important one, especially on Linux.

Comment thread crates/dbsp/src/circuit/dbsp_handle.rs
sender_start: senders.start.try_into().unwrap(),
sender_end: senders.end.try_into().unwrap(),
data_len: data.len().try_into().unwrap(),
struct ExchangeClient {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an old architectural diagram below showing ExchangeSender's/ExchangeReceiver's. Can we extend it to explain the role of ExchangeClient/ExchangeServer/Listeners as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this:

/// # Multihost
///
/// Workers can reside in different processes that need to exchange data over a
/// network.  There is little reason to do this if the processes are on the same
/// host, so we call this "multihost" exchange.  We tend to speak of processes
/// and hosts interchangeably in this context.
///
/// Multihost exchange works mostly as shown in the diagram above, except that
/// there is a network in the middle.  Suppose that we have two hosts with two
/// workers each, even though they ordinarily would have more than that.  Each
/// host listens on a network port with a single [ExchangeListener] and
/// constructs one [ExchangeClient] for each remote host.  Data destined to a
/// worker on the same host uses local mechanisms; data destined to a worker on
/// a different host flows through an appropriate `ExchangeClient` to the remote
/// `ExchangeListener` to the correct worker.
///
/// The diagram below shows how ExchangeSender in worker 1 (ES1) sends data to
/// the ExchangeReceivers (ERs) for other workers.  Data for workers 1 and 2
/// stays on the same host, so it goes directly.  Data for workers 3 and 4
/// passes through the local ExchangeClient (EC1) to the remote ExchangeListener
/// (EL2), which delivers it to the remote ExchangeReceivers.
///
/// ```text
///     ┌───┐      ┌───┐
///  ──►│ES1│──┬──>│ER1│   Worker 1
///     └───┘  │   └───┘
///            │
///     ┌───┐  │   ┌───┐
///  ──►│ES2│  ├──>│ER2│   Worker 2
///     └───┘  │   └───┘
///            ↓
///          ┌───┐
///          │EC1│
///          └───┘
///            │
/// HOST 1     │
/// ───NETWORK CONNECTION───────────────────────────────────────────
/// HOST 2     │
///            ↓
///          ┌───┐
///          │EL2│
///          └───┘
///            │
///     ┌───┐  │   ┌───┐
///  ──►│ES3│  ├──>│ER3│   Worker 3
///     └───┘  │   └───┘
///            │
///     ┌───┐  │   ┌───┐
///  ──►│ES4│  └──>│ER4│   Worker 4
///     └───┘      └───┘
/// ```


impl ExchangeClient {
async fn new(remote_address: SocketAddr, remote_workers: &Range<usize>) -> Self {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything preventing us from buffering unbounded in-memory data in this channel.

fn is_ready(&self) -> bool {
match self.exchange.kind {
ExchangeKind::Sync => self.exchange.ready_to_receive(self.worker_index),
ExchangeKind::Stream => self
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another reason why moving accumulator inside the operator is a good idea. The operator is always ready until flush is called, so steps will complete without waiting for remote inputs. Only when executing an eval following flush the operator will have to wait for all remote inputs to arrive.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 18, 2026

@ryzhyk

I revised the series. The lead-up to the new code is mostly unchanged, but:

  • I removed the commit "Add support for steraming exchange".
  • I added new commits:
    • "Make Exchange more easily used by code outside its module.:
    • "Implement optimized sharded accumulator operator."
    • "Prevent memory exhaustion due to infinite queuing."

There's a comment in the new code for ShardedAccumulatorReceiver that needs some thought:

    fn ready(&self) -> bool {
        // This operator does not fit well into the DBSP evaluation model.  It
        // only has anything useful to contribute when it has received a flush.
        // At any other time, it can immediately evaluate to `None`.  However:
        //
        // - If it only reports that it is ready when it has received a flush,
        //   then it will prevent the step from completing until it has.  This
        //   will deadlock because the flush will never be received (since
        //   generally it takes more than one step to flush).
        //
        // - If it does what it does here, and always reports that it is ready,
        //   then this could cause livelock, spinning uselessly with 100% CPU,
        //   if there's nothing for the circuit to do while data transmits.
        //
        // I don't know the right solution.
        true
    }

I also see an intermittent failure in operator::dynamic::recursive::test::issue4168. It might just mean that test runs slowly sometimes, since it has a hardcoded timeout, but it might be something else, I dunno yet.

Copy link
Copy Markdown
Contributor

@ryzhyk ryzhyk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new changes look good. I'll think about a solution for the busy-waiting problem.

npeers,
global_offset: 0,
sender_offsets: repeat_n(0, npeers).collect(),
spines: VecDeque::from([(npeers, Spine::new(factories))]),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this offline. This is worth documenting. I wasn't able to figure out why this needs to be a queue until you explained it to me.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit that should make this more obvious.

// We are called in async context, so blocking inside here has higher
// cost than one might expect, so depending on how it shows up in
// profiles this might be a good optimization target.
spine.insert(batch);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should look into creating an async-friendly version of spine.insert.

@blp
Copy link
Copy Markdown
Member Author

blp commented Apr 22, 2026

blp and others added 29 commits April 23, 2026 16:46
Until now, DBSP has measured operator runtimes by the amount of wall-clock
time from when they are started to when they complete.  This is an
overestimate, because operators are asynchronous and can yield mid-run.
This commit changes the scheduler to instead instrument each individual
invocation of an operator and then sum the times to get an accurate
runtime.

Previously it was possible to avoid profiling overhead, but I think in
practice we always enabled it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
No particular reason, I was just reading code and confused about the bounds
and it turned out they weren't needed.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The trait documentation says so:

```rust
    /// Returns `true` if `self` is an input operator.
    ///
    /// An input operator feeds new data into the circuit. Examples are
    /// the `Input` and `Generator` operators.
    fn is_input(&self) -> bool {
        false
    }
```

I don't know why they did not return `true` for these input operators
before.  I don't know what the consequences are for returning the wrong
value (this is not in response to some bug I discovered).

Signed-off-by: Ben Pfaff <blp@feldera.com>
I thought maybe it was possible to have an operator indicate that it was
already flushed immediately and then never get evaluated.  This is not the
case.

Plus a few other minor comment improvements.

Signed-off-by: Ben Pfaff <blp@feldera.com>
As far as I can tell, `eval` and `eval_owned` were the same except for
a clone() call, so this simplifies things.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This will enable adding a field that can't be cloned, in an upcoming
commit.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Before commit e60a8a3 ("[dbsp] Use new protocol for exchanging data
among hosts."), each remote exchange RPC waited for the data to be picked
up out of its mailboxes before returning.  That commit changed that, since
now sending data doesn't wait for a reply from the remote host (there is
in fact no reply).  In fact, it didn't do anything to prevent such races.

This fixes the problem.  Instead of introducing a wait on the RPC sender
side, it introduces it on the receiver side.  The receiver waits for the
mailboxes it is writing to be empty before writing to them.

I noticed that we have a 2-d array of notifiers for this purpose but we
were going to wait on all of the notifiers in the column for a given
sender in sequence.  It is more efficient in time and space to simply
have one per sender instead, so I made that change.  At the same time,
I noticed that Broadcast could simply use that same Notify instead of
adding its own via registering a callback, so I made that simplification
too.

sender_notifies is now redundant with sender_callbacks, but it would
involve changing a lot of code to switch from one to the other, so this
commit does not make that change.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
In practice these were always supplied as Runtime::worker_index() and it
didn't make sense to supply any other value.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the code for exchange has always bound its own socket to listen
for incoming connections.  This works fine in what is our common case,
where the listening socket is a well-known port in a container that doesn't
have anything else running.  But for tests, or for situations where the
host running the pipeline has other software running and doesn't reserve
specific ports for the pipeline, it's more reliable to bind all the
listening ports (probably letting the kernel choose a port number), then
start the pipelines with the port numbers known in advance.  This commit
allows for that.  The initial use case will be unit tests, in upcoming
commits.

Signed-off-by: Ben Pfaff <blp@feldera.com>
By moving this out of `Exchange` into an argument for try_receive_all(),
we move it closer to where it's actually needed, and it's no longer
necessary to box it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This separation was painful and I'm glad to get rid of it.

Signed-off-by: Ben Pfaff <blp@feldera.com>
The word `Service` wasn't doing anything here.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, multihost exchange has accumulated the data sent from one host
to another until all of it is available, then sent it all together.  This
means that serialization and deserialization was delayed until all the data
was available.  This commit changes the code to instead send data as it
becomes available.

This commit also makes more use of async functions in the exchange code.

This also adds unit tests for multihost exchange.

Signed-off-by: Ben Pfaff <blp@feldera.com>
We introduce the `shard_accumulate` operator, which is functonally equivalent to
`.shard().accumulate()`, but allows a more efficient distributed implementation that
overlaps communication and computation.

Until now every exchange operator behaved as a barrier at each step of the circuit:
operators downstream of an exchange could only be evaluated after the receiving side
of the exchange has received inputs from all peers.

Most of the time this is unnecessary: exchange, and in particular `shard`, which is
the most common application of exchange, is almost always followed by an accumulator
and an integral (the reason we shard is to create indexed integrals for operators like
join, aggregate, and distinct). We therefore only need a barrier when committing a
transaction, i.e., after a `flush`. This leads to an optimized exchange that does all
the sending and receiving in the background, only stopping to wait for input from peers
post-flush.

For this to work well, it makes sense to combine exchange with an accumulator to
accumulate inputs in a spine (which handles merging small updates in the background
and pushing state to disk).

This commit introduces the `shard_accumulate` operator, which is currently implemented
as `.shard().accumulate()`, and replaces individual calls to `shard` and `accumulate`
with `shard_accumulate` where appropriate.

Upcoming commits will replace the naive implementation with an optimized async
implementation.

TODOs:
- This optimization does not yet apply to adaptive joins, which don't use the
  `shard` operator. The approach applies there as well, but it needs more work.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Testing intermediate results often failed because only the final result
for each aggregation is predictable, whereas the number of steps needed
to arrive at the final results varies.

Thanks to Leonid Ryzhyk for helping me find the solution.

Signed-off-by: Ben Pfaff <blp@feldera.com>
This refactors the exchange code slightly to make it usable by the new
sharded accumulator implementation that is coming up in a later commit:

* Rename Receiver to ExchangeDelivery.

* Rename Clients to ExchangeClients.

* Change ExchangeDirectory from a typename into a properly encapsulated
  type.

* Give ExchangeClients a more convenient constructor.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
This is a theoretical concern.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
make `cargo test -- --nocapture` less noisy.

Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
It's easy enough to get a batch out of an `Arc` if it has a reference count
of 1, and we always ultimately need it in an `Arc`, so just always take
the argument in one.

The new function Spine::maybe_flush_batch will have another user in an
upcoming commit.

Signed-off-by: Ben Pfaff <blp@feldera.com>
…r rx.

Rxq::deliver(), which is called holding an exclusive lock, could both do
I/O to spill a batch to storage and block waiting for merging to occur
in the spine.  This is fine when evaluating a normal operator but it is
bad on the sharded accumulator delivery path because it will force any
other sender to block until it is finished also.

This commit adds a method Spine::insert_without_blocking which does not
include either of those behaviors and modifies the sharded accumulator
delivery path to use the new method.  It adds code to do the I/O, when
necessary, before calling the new method, and it adds a call to a new
method Spine::backpressure_wait to do all the waiting for merging in one
place just before receive (which might even be more efficient).

This commit unfortunately also includes a lot of chaff to deal with turning
Spine::insert into an async function.  This is necessary because we now
need to wait on it from an async context, which means the Condvar, which
is not async-capable, needs to become a Notify, which only works in an
async context.  Spine::insert indeed runs inside an async context most
of the time (operator evaluation occurs inside one) but one can't enter
such a context nested (`block_on` and such will panic), so instead one
must thread async all the way in.  Ordinarily I'd try to factor that
change out into a separate commit, but it was rather tangled in this
case.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the spine has started a merger task for every level when it is
created.  This was OK for the way we used the spine at the time: there
were only a few of them and they were all created when we created the
circuit.  Now, however, the sharded accumulator creates a spine for every
transaction, so the cost of creating it matters.  This commit changes
the spine to only start a single merger task and use it for all of the
merges at all the levels.

This improves runtime in release mode for the issue4168 test from
11.4 seconds to 3.3 seconds.

Signed-off-by: Ben Pfaff <blp@feldera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

DBSP core Related to the core DBSP library ft Fault tolerant, distributed, and scale-out implementation multihost Related to multihost or distributed pipelines performance rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants