Conversation
mythical-fred
left a comment
There was a problem hiding this comment.
Draft-level comment: this is getting hard to reason about as one unit because the core semantic change (streaming exchange / shard-accumulate) is mixed with a long tail of preparatory cleanups, instrumentation tweaks, and exchange refactors. Before this is ready, I would strongly consider splitting it into a stack: prep refactors/tests first, then the streaming-exchange semantic change, then shard-accumulate on top. That keeps the design review focused on the actual behavioral change instead of making the reviewer re-prove every cleanup commit at the same time.
|
@ryzhyk I think I'll need some advice, please see the final commit "showing why the proptest_max_retain_values_test failure is surprising" (which obviously isn't meant to merge). But only when you have a chance; I'll be away from keyboard for a few days. |
That test fails for me even if I comment out |
I'll take another look, then. |
|
I believe I've found the underlying cause of the test failures. The test in question does an aggregation of two different streams and compares them:
Since the two aggregations are over different source streams, and those source streams are not synchronized (streaming exchange runs at an unpredictable rate), sometimes they get out of sync and cause a failure. The solution is not obvious to me. |
|
I pushed a new final commit that demonstrates the issue in more detail. |
|
I fixed the problem (thanks @ryzhyk) |
mythical-fred
left a comment
There was a problem hiding this comment.
One blocker.
The branch is now ready for review, so the commit history has to be cleaned up before merge. There are still at least two hard-blocking commit subjects in the stack: remove integrals and shard_accumulate: wip. Both violate the commit-message rule, and the latter is explicitly WIP history. Please squash/reword the branch into clean, mergeable commits before asking for final review.
Those are good suggestions but, if we are to implement them, @ryzhyk is probably the right person to do it. |
ryzhyk
left a comment
There was a problem hiding this comment.
The main feedback we discussed offline: it make sense to merge Accumulator into ExchangeReceiver.
| { | ||
| type Output = (T::Output, Duration); | ||
|
|
||
| fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
There was a problem hiding this comment.
Very clever. I didn't know how to do this. Could you add a comment explaining how this works?
There was a problem hiding this comment.
Good idea. I added this comment on Timed<T>:
/// An async task that measures its runtime.
///
/// This uses the same wrapper technique as [tokio_metrics::Instrumented] or
/// [tracing::Instrument]: by implementing [Future] manually, it wraps each
/// poll with elapsed time measurement. When the future eventually
/// completes, it outputs the wrapped future's output value plus the total
/// elapsed time.
///
/// [tokio_metrics::Instrumented]: https://docs.rs/tokio-metrics/latest/tokio_metrics/struct.Instrumented.html
/// [tracing::Instrument]: https://docs.rs/tracing/latest/tracing/trait.Instrument.html
| fn fixedpoint(&self, _scope: Scope) -> bool { | ||
| false | ||
| } | ||
| fn is_input(&self) -> bool { |
There was a problem hiding this comment.
I think this is a good fix. This feature is currently only used by the string interner.
| where | ||
| F: FnMut(T), | ||
| { | ||
| let receiver = Runtime::worker_index(); |
There was a problem hiding this comment.
I suspect this was an optimization to avoid accessing thread-locals on each call. But maybe it's not an important one, especially on Linux.
| sender_start: senders.start.try_into().unwrap(), | ||
| sender_end: senders.end.try_into().unwrap(), | ||
| data_len: data.len().try_into().unwrap(), | ||
| struct ExchangeClient { |
There was a problem hiding this comment.
There is an old architectural diagram below showing ExchangeSender's/ExchangeReceiver's. Can we extend it to explain the role of ExchangeClient/ExchangeServer/Listeners as well?
There was a problem hiding this comment.
I added this:
/// # Multihost
///
/// Workers can reside in different processes that need to exchange data over a
/// network. There is little reason to do this if the processes are on the same
/// host, so we call this "multihost" exchange. We tend to speak of processes
/// and hosts interchangeably in this context.
///
/// Multihost exchange works mostly as shown in the diagram above, except that
/// there is a network in the middle. Suppose that we have two hosts with two
/// workers each, even though they ordinarily would have more than that. Each
/// host listens on a network port with a single [ExchangeListener] and
/// constructs one [ExchangeClient] for each remote host. Data destined to a
/// worker on the same host uses local mechanisms; data destined to a worker on
/// a different host flows through an appropriate `ExchangeClient` to the remote
/// `ExchangeListener` to the correct worker.
///
/// The diagram below shows how ExchangeSender in worker 1 (ES1) sends data to
/// the ExchangeReceivers (ERs) for other workers. Data for workers 1 and 2
/// stays on the same host, so it goes directly. Data for workers 3 and 4
/// passes through the local ExchangeClient (EC1) to the remote ExchangeListener
/// (EL2), which delivers it to the remote ExchangeReceivers.
///
/// ```text
/// ┌───┐ ┌───┐
/// ──►│ES1│──┬──>│ER1│ Worker 1
/// └───┘ │ └───┘
/// │
/// ┌───┐ │ ┌───┐
/// ──►│ES2│ ├──>│ER2│ Worker 2
/// └───┘ │ └───┘
/// ↓
/// ┌───┐
/// │EC1│
/// └───┘
/// │
/// HOST 1 │
/// ───NETWORK CONNECTION───────────────────────────────────────────
/// HOST 2 │
/// ↓
/// ┌───┐
/// │EL2│
/// └───┘
/// │
/// ┌───┐ │ ┌───┐
/// ──►│ES3│ ├──>│ER3│ Worker 3
/// └───┘ │ └───┘
/// │
/// ┌───┐ │ ┌───┐
/// ──►│ES4│ └──>│ER4│ Worker 4
/// └───┘ └───┘
/// ```
|
|
||
| impl ExchangeClient { | ||
| async fn new(remote_address: SocketAddr, remote_workers: &Range<usize>) -> Self { | ||
| let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); |
There was a problem hiding this comment.
I don't see anything preventing us from buffering unbounded in-memory data in this channel.
| fn is_ready(&self) -> bool { | ||
| match self.exchange.kind { | ||
| ExchangeKind::Sync => self.exchange.ready_to_receive(self.worker_index), | ||
| ExchangeKind::Stream => self |
There was a problem hiding this comment.
This is another reason why moving accumulator inside the operator is a good idea. The operator is always ready until flush is called, so steps will complete without waiting for remote inputs. Only when executing an eval following flush the operator will have to wait for all remote inputs to arrive.
|
I revised the series. The lead-up to the new code is mostly unchanged, but:
There's a comment in the new code for ShardedAccumulatorReceiver that needs some thought: I also see an intermittent failure in |
ryzhyk
left a comment
There was a problem hiding this comment.
The new changes look good. I'll think about a solution for the busy-waiting problem.
| npeers, | ||
| global_offset: 0, | ||
| sender_offsets: repeat_n(0, npeers).collect(), | ||
| spines: VecDeque::from([(npeers, Spine::new(factories))]), |
There was a problem hiding this comment.
We discussed this offline. This is worth documenting. I wasn't able to figure out why this needs to be a queue until you explained it to me.
There was a problem hiding this comment.
I pushed a commit that should make this more obvious.
| // We are called in async context, so blocking inside here has higher | ||
| // cost than one might expect, so depending on how it shows up in | ||
| // profiles this might be a good optimization target. | ||
| spine.insert(batch); |
There was a problem hiding this comment.
maybe we should look into creating an async-friendly version of spine.insert.
Until now, DBSP has measured operator runtimes by the amount of wall-clock time from when they are started to when they complete. This is an overestimate, because operators are asynchronous and can yield mid-run. This commit changes the scheduler to instead instrument each individual invocation of an operator and then sum the times to get an accurate runtime. Previously it was possible to avoid profiling overhead, but I think in practice we always enabled it. Signed-off-by: Ben Pfaff <blp@feldera.com>
No particular reason, I was just reading code and confused about the bounds and it turned out they weren't needed. Signed-off-by: Ben Pfaff <blp@feldera.com>
The trait documentation says so:
```rust
/// Returns `true` if `self` is an input operator.
///
/// An input operator feeds new data into the circuit. Examples are
/// the `Input` and `Generator` operators.
fn is_input(&self) -> bool {
false
}
```
I don't know why they did not return `true` for these input operators
before. I don't know what the consequences are for returning the wrong
value (this is not in response to some bug I discovered).
Signed-off-by: Ben Pfaff <blp@feldera.com>
I thought maybe it was possible to have an operator indicate that it was already flushed immediately and then never get evaluated. This is not the case. Plus a few other minor comment improvements. Signed-off-by: Ben Pfaff <blp@feldera.com>
As far as I can tell, `eval` and `eval_owned` were the same except for a clone() call, so this simplifies things. Signed-off-by: Ben Pfaff <blp@feldera.com>
This will enable adding a field that can't be cloned, in an upcoming commit. Signed-off-by: Ben Pfaff <blp@feldera.com>
Before commit e60a8a3 ("[dbsp] Use new protocol for exchanging data among hosts."), each remote exchange RPC waited for the data to be picked up out of its mailboxes before returning. That commit changed that, since now sending data doesn't wait for a reply from the remote host (there is in fact no reply). In fact, it didn't do anything to prevent such races. This fixes the problem. Instead of introducing a wait on the RPC sender side, it introduces it on the receiver side. The receiver waits for the mailboxes it is writing to be empty before writing to them. I noticed that we have a 2-d array of notifiers for this purpose but we were going to wait on all of the notifiers in the column for a given sender in sequence. It is more efficient in time and space to simply have one per sender instead, so I made that change. At the same time, I noticed that Broadcast could simply use that same Notify instead of adding its own via registering a callback, so I made that simplification too. sender_notifies is now redundant with sender_callbacks, but it would involve changing a lot of code to switch from one to the other, so this commit does not make that change. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
In practice these were always supplied as Runtime::worker_index() and it didn't make sense to supply any other value. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the code for exchange has always bound its own socket to listen for incoming connections. This works fine in what is our common case, where the listening socket is a well-known port in a container that doesn't have anything else running. But for tests, or for situations where the host running the pipeline has other software running and doesn't reserve specific ports for the pipeline, it's more reliable to bind all the listening ports (probably letting the kernel choose a port number), then start the pipelines with the port numbers known in advance. This commit allows for that. The initial use case will be unit tests, in upcoming commits. Signed-off-by: Ben Pfaff <blp@feldera.com>
By moving this out of `Exchange` into an argument for try_receive_all(), we move it closer to where it's actually needed, and it's no longer necessary to box it. Signed-off-by: Ben Pfaff <blp@feldera.com>
This separation was painful and I'm glad to get rid of it. Signed-off-by: Ben Pfaff <blp@feldera.com>
The word `Service` wasn't doing anything here. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, multihost exchange has accumulated the data sent from one host to another until all of it is available, then sent it all together. This means that serialization and deserialization was delayed until all the data was available. This commit changes the code to instead send data as it becomes available. This commit also makes more use of async functions in the exchange code. This also adds unit tests for multihost exchange. Signed-off-by: Ben Pfaff <blp@feldera.com>
We introduce the `shard_accumulate` operator, which is functonally equivalent to `.shard().accumulate()`, but allows a more efficient distributed implementation that overlaps communication and computation. Until now every exchange operator behaved as a barrier at each step of the circuit: operators downstream of an exchange could only be evaluated after the receiving side of the exchange has received inputs from all peers. Most of the time this is unnecessary: exchange, and in particular `shard`, which is the most common application of exchange, is almost always followed by an accumulator and an integral (the reason we shard is to create indexed integrals for operators like join, aggregate, and distinct). We therefore only need a barrier when committing a transaction, i.e., after a `flush`. This leads to an optimized exchange that does all the sending and receiving in the background, only stopping to wait for input from peers post-flush. For this to work well, it makes sense to combine exchange with an accumulator to accumulate inputs in a spine (which handles merging small updates in the background and pushing state to disk). This commit introduces the `shard_accumulate` operator, which is currently implemented as `.shard().accumulate()`, and replaces individual calls to `shard` and `accumulate` with `shard_accumulate` where appropriate. Upcoming commits will replace the naive implementation with an optimized async implementation. TODOs: - This optimization does not yet apply to adaptive joins, which don't use the `shard` operator. The approach applies there as well, but it needs more work. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Testing intermediate results often failed because only the final result for each aggregation is predictable, whereas the number of steps needed to arrive at the final results varies. Thanks to Leonid Ryzhyk for helping me find the solution. Signed-off-by: Ben Pfaff <blp@feldera.com>
This refactors the exchange code slightly to make it usable by the new sharded accumulator implementation that is coming up in a later commit: * Rename Receiver to ExchangeDelivery. * Rename Clients to ExchangeClients. * Change ExchangeDirectory from a typename into a properly encapsulated type. * Give ExchangeClients a more convenient constructor. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
This is a theoretical concern. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
make `cargo test -- --nocapture` less noisy. Signed-off-by: Leonid Ryzhyk <ryzhyk@gmail.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
It's easy enough to get a batch out of an `Arc` if it has a reference count of 1, and we always ultimately need it in an `Arc`, so just always take the argument in one. The new function Spine::maybe_flush_batch will have another user in an upcoming commit. Signed-off-by: Ben Pfaff <blp@feldera.com>
Signed-off-by: Ben Pfaff <blp@feldera.com>
…r rx. Rxq::deliver(), which is called holding an exclusive lock, could both do I/O to spill a batch to storage and block waiting for merging to occur in the spine. This is fine when evaluating a normal operator but it is bad on the sharded accumulator delivery path because it will force any other sender to block until it is finished also. This commit adds a method Spine::insert_without_blocking which does not include either of those behaviors and modifies the sharded accumulator delivery path to use the new method. It adds code to do the I/O, when necessary, before calling the new method, and it adds a call to a new method Spine::backpressure_wait to do all the waiting for merging in one place just before receive (which might even be more efficient). This commit unfortunately also includes a lot of chaff to deal with turning Spine::insert into an async function. This is necessary because we now need to wait on it from an async context, which means the Condvar, which is not async-capable, needs to become a Notify, which only works in an async context. Spine::insert indeed runs inside an async context most of the time (operator evaluation occurs inside one) but one can't enter such a context nested (`block_on` and such will panic), so instead one must thread async all the way in. Ordinarily I'd try to factor that change out into a separate commit, but it was rather tangled in this case. Signed-off-by: Ben Pfaff <blp@feldera.com>
Until now, the spine has started a merger task for every level when it is created. This was OK for the way we used the spine at the time: there were only a few of them and they were all created when we created the circuit. Now, however, the sharded accumulator creates a spine for every transaction, so the cost of creating it matters. This commit changes the spine to only start a single merger task and use it for all of the merges at all the levels. This improves runtime in release mode for the issue4168 test from 11.4 seconds to 3.3 seconds. Signed-off-by: Ben Pfaff <blp@feldera.com>

Please read the individual commits.
I'm currently working to figure out why one test fails (see final commit). That's why this is being submitted as a draft PR. After I fix it, I'll mark it ready for review.