added priority support by pengyk · Pull Request #784 · taskflow/taskflow · GitHub
Skip to content

added priority support#784

Open
pengyk wants to merge 4 commits into
taskflow:masterfrom
pengyk:priorities
Open

added priority support#784
pengyk wants to merge 4 commits into
taskflow:masterfrom
pengyk:priorities

Conversation

@pengyk

@pengyk pengyk commented Apr 20, 2026

Copy link
Copy Markdown

Summary

  • Add a priority-aware scheduling pipeline (prioritized_run()) that respects TaskPriority::HIGH, TaskPriority::NORMAL, and TaskPriority::LOW when choosing which ready task to execute next
  • Introduce per-worker priority work-stealing queues (_prio_wsq[3]) and a StagingQueue buffer that batches NORMAL/LOW tasks while pushing HIGH tasks immediately
  • Gate all priority logic behind an atomic _num_prioritized counter so the existing run() path incurs zero overhead

Design

  • Priority-ordered stealing: _prio_sweep_task() scans all workers and buffers in HIGH → NORMAL → LOW order, guaranteeing no lower-prioritask is taken while a higher-priority task exists anywhere.
    A macro TF_ENFORCE_PRIORITY_EXPLOIT that enables cross-worker global sweep during the exploit phase

  • Continuation cache: _prio_update_cache() keeps the higher-priority successor in the cache and stages the lower one, preserving the existing zero-atomic-op continuation optimization while respecting priorities.

  • Staging buffer — NORMAL and LOW tasks accumulate in a per-worker. StagingQueue during task invocation and are flushed to priority aware work stealing queues after each invoke via _prio_flush(). HIGH tasks bypass the buffer entirely.

Some benchmarks

Uniform Independent Tasks with Mixed Priorities (128 equal-cost tasks, round-robin H/N/L)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 11.1 10.6 9.52 -5% -14%
2 9.63 12.9 13.8 +34% +43%
4 18.7 25.4 23.7 +36% +27%
8 28.3 30.2 31.6 +7% +12%
16 26.3 33.0 35.8 +25% +36%
32 27.4 32.7 39.7 +19% +45%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 10.2 9.67 8.67 -5% -15%
2 9.29 12.2 13.3 +31% +43%
4 20.7 26.7 22.2 +29% +7%
8 30.3 30.5 31.6 +1% +4%
16 25.1 32.8 34.3 +31% +37%
32 25.7 31.5 39.0 +23% +52%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.8 13.8 11.3 -7% -24%
2 11.0 14.7 17.3 +34% +57%
4 26.4 36.2 32.9 +37% +25%
8 38.1 37.7 39.4 -1% +3%
16 41.5 46.9 47.8 +13% +15%
32 43.3 46.7 48.7 +8% +12%

Fan-Out with Skewed Work Costs (300 tasks: 50 expensive HIGH, 100 medium NORMAL, 150 cheap LOW)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.9 17.4 16.9 +17% +13%
2 19.7 22.5 41.7 +14% +112%
4 41.4 33.6 57.2 -19% +38%
8 68.7 48.7 52.7 -29% -23%
16 54.4 56.9 63.0 +5% +16%
32 47.7 52.3 77.9 +10% +63%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 14.8 17.2 16.0 +16% +8%
2 19.7 18.9 32.4 -4% +64%
4 45.4 29.4 59.0 -35% +30%
8 71.8 48.5 51.4 -32% -28%
16 58.7 58.7 58.2 0% -1%
32 40.1 51.8 69.1 +29% +72%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 15.9 18.7 19.2 +18% +21%
2 23.5 38.3 87.8 +63% +274%
4 58.8 50.4 73.8 -14% +26%
8 97.4 57.5 79.5 -41% -18%
16 82.4 72.5 96.7 -12% +17%
32 80.7 75.2 115 -7% +43%

Large Fan-Out with Skewed Work Costs (1000 tasks: 100 expensive HIGH, 300 medium NORMAL, 600 cheap LOW)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 27.4 36.8 34.0 +34% +24%
2 73.3 83.7 102 +14% +39%
4 138 109 95.7 -21% -31%
8 200 126 140 -37% -30%
16 166 124 167 -25% +1%
32 127 120 204 -6% +61%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 26.3 36.3 32.7 +38% +24%
2 68.0 78.7 99.6 +16% +46%
4 147 111 84.0 -24% -43%
8 210 128 141 -39% -33%
16 181 124 155 -31% -14%
32 135 123 195 -9% +44%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 29.7 40.9 38.0 +38% +28%
2 85.7 94.5 113 +10% +32%
4 160 128 162 -20% +1%
8 240 142 157 -41% -35%
16 224 156 238 -30% +6%
32 203 140 278 -31% +37%

Few Expensive Tasks Among Many Cheap Ones (10 heavy HIGH "whales" + 1000 near-instant LOW "minnows")

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 26.4 40.6 35.6 +54% +35%
2 85.0 85.2 109 0% +28%
4 127 120 100 -6% -21%
8 231 117 149 -49% -35%
16 126 132 183 +5% +45%
32 130 125 215 -4% +65%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 25.6 35.9 34.6 +40% +35%
2 72.5 75.5 106 +4% +46%
4 133 124 91.9 -7% -31%
8 241 123 149 -49% -38%
16 134 128 168 -4% +25%
32 144 124 203 -14% +41%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 30.8 51.4 39.2 +67% +27%
2 157 128 120 -18% -24%
4 162 149 144 -8% -11%
8 292 152 159 -48% -46%
16 190 153 256 -19% +35%
32 205 161 294 -21% +43%

Layered Pipeline with Dependencies (5 layers x 20 tasks, each layer depends on the previous; HIGH tasks gate downstream work)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 13.1 14.2 13.2 +8% +1%
2 25.9 29.8 23.9 +15% -8%
4 40.3 41.4 46.2 +3% +15%
8 44.1 45.4 48.9 +3% +11%
16 47.3 43.8 44.6 -7% -6%
32 44.0 45.8 50.5 +4% +15%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 12.8 13.5 12.8 +5% 0%
2 18.6 21.9 24.8 +18% +33%
4 42.2 45.0 44.8 +7% +6%
8 47.5 48.3 51.5 +2% +8%
16 46.7 38.7 36.6 -17% -22%
32 41.3 40.1 41.6 -3% +1%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 16.7 18.4 14.7 +10% -12%
2 49.6 57.3 31.8 +16% -36%
4 66.7 67.4 69.8 +1% +5%
8 60.6 68.3 68.9 +13% +14%
16 74.3 74.2 81.7 0% +10%
32 73.8 76.8 93.4 +4% +27%

Thread-Matched Scheduling Test (N HIGH tasks + N LOW tasks where N = thread count; HIGH work = N x LOW work, all independent)

Mean

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 8.16 17.7 6.43 +117% -21%
2 7.95 8.67 7.54 +9% -5%
4 10.7 8.99 10.3 -16% -4%
8 12.5 11.8 13.1 -6% +5%
16 18.8 19.9 23.0 +6% +22%
32 51.7 23.7 33.1 -54% -36%

Median

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 7.75 12.0 7.04 +55% -9%
2 7.40 8.17 7.75 +10% +5%
4 7.92 7.98 10.5 +1% +33%
8 11.6 11.1 13.3 -4% +15%
16 18.1 18.7 23.4 +3% +29%
32 22.2 21.3 32.8 -4% +48%

P90

Threads Run Prio Enforced Prio vs Run Enforced vs Run
1 12.5 21.0 8.87 +68% -29%
2 14.8 15.3 9.92 +3% -33%
4 18.4 15.3 13.3 -17% -28%
8 21.5 19.1 19.0 -11% -12%
16 30.7 31.8 32.4 +4% +6%
32 47.3 34.3 42.5 -27% -10%

Github gist of the benchmarks https://gist.github.com/pengyk/af6df7e2bbf1ec341473af012b563102

@pengyk pengyk marked this pull request as ready for review April 21, 2026 20:14
@tsung-wei-huang

Copy link
Copy Markdown
Member

Hi @pengyk , thank you for this pull request! I have the following questions:

  1. It's not very clear to me the purpose of Staging Queue. Could you please elaborate it a bit more?
  2. I was thinking each worker only keeps a std::array<wsq_type, 3> _wsqs; where HIGH_PRIORITY=0, MEDIUM_PRIORITY=1, and LOW_PRIORITY=2, so we can simply iterate each of the three queues in order. In this case, steal and pop at the executor level will remain pretty much the same but with additional loop for priority first. pop is easier since we can just insert a node to the corresponding _wsqs[node->_priority].

I think perhaps the most efficient way to support priority is integrating it natively into work-stealing queue (WSQ). Supporting priority for unbounded WSQ might be a bit challenging now but I feel it is possible for bounded WSQ. Basically, we will maintain P priority splits on the ring buffer, where P can be 1 (default), 2, 4, etc. Each of the split works as a standalone WSQ in a single storage. We want P to be power of two so we can use efficient bit masking. In this case, the queue can natively support priority scheduling without any additional structure like std::array<wsq_type, 3> _wsqs;. Thought?

template <
  typename T, 
  size_t LogSize = TF_DEFAULT_BOUNDED_TASK_QUEUE_LOG_SIZE,
  size_t LogSplitSize = 0  // default 2^0 = 1, no priority support
> 
class BoundedWSQ {
  ... // we will support all priority-based push/pop/steal here.
}

@pengyk

pengyk commented May 18, 2026

Copy link
Copy Markdown
Author

Hi @tsung-wei-huang thanks for taking a look!

  1. It is just to reduce the chances of a situation where, as worker is pushing its successors, another thread will not greedily steal a low priority task. It will "hide" the normal and low priority tasks and only expose the high priority tasks at first so that high priority tasks are guaranteed to be made visible first to be stolen. Imagine a graph like this
graph TD
    A[Root Node] --> B[Node 1<br/>Low priority]
    A --> C[Node 2<br/>Low priority]
    A --> D[Node 3<br/>Low priority]
    A --> E[Node 4<br/>High priority]
    A --> F[Node 5<br/>High priority]
Loading

If we have 3 threads and tasks are made ready in the given order, I think we can guarantee that the high priority tasks will all get executed first.

I do agree that this might be a little overkill and might degrade performance, so I am open to removing it.

I think the rest is pretty much just a std::array<wsq_type, 3> _wsqs except that I defined to be called _prio_wsq

  1. This is a good idea! My one concern is that I think that right now we can execute multiple "topologies" on the same resources. So let's say we have one graph with 1 priority level, another with 2 and another with 8, wouldn't it be hard to keep track of which index is available for pushes? Also, would the semantics for defining the priority be harder to reason? Right now it is simply 3 hard coded enums for high/medium/low.

Let me know if that isn't clear, thanks!

@tsung-wei-huang

tsung-wei-huang commented May 22, 2026

Copy link
Copy Markdown
Member

@pengyk Thank you for the response! After a careful review, I suggest the following modification to refactor the pull:

  1. Given the ultimate randomness of the work-stealing scheduler, I agree with you that Staging Queue is an overkill. The cost of its seems overshadows its benefit.
  2. I would suggest let's focus on having priority-based scheduling at the worker level first, rather than the centralized spilling queue. The design of this centralized spilling queue is still being tested and evaluated in terms of its space and runtime tradeoff, and my require some future changes (e.g., NUMA adoption). On the other hand, the worker-level queue design is relatively fixed and stable. So let's focus on brining priority support to worker-level queue for now.

With 2, I suggest the following refactoring with a new class BoundedPriorityWSQ that abstract all the pop/steal/push operations with priority support so we can minimize the modification of tf::Executor

// let's define a concept for work-stealing queue
template <typename Q>
concept BoundedWSQLike = requires(Q& q) {
  { q.steal() };
  { q.pop() };
  { q.try_push() } -> bool ;  // unbounded wsq instead has push -> void
}

// top wrapper over WSQ - and yes, I agree with you that LogPriority is not a good idea. Let's just template it linearly
template <BoundedWSQLike Q, size_t MaxPriority>
class BoundedPriorityWSQ {
  
  public:
    // here we provide all BoundedWSQ methods with priority i to pin the operation to specific _wsq[i]
   template <typename O>   
   bool try_push(size_t priority, O&& item);

   ... similarly for other methods

   // for method that doesn't have priority, we will, by default, perform the batch operation from i=0, to MaxPriority-1   
   // note that i=0 means the highest priority
  template <typename O> 
  bool try_push(O&& item);
  
  ... similarly for other methods
   
  private:
  std::array<Q, MaxPriority> _wsqs;
}

Now, the advantage of this is, we can statically unroll all batch operations (e.g., BoundedPriorityWSQ::try_push(O&& item) in correct priority level.

// let's use the static unroll_until https://github.com/taskflow/taskflow/blob/a215b4578338cc27cba5c4772c4f73b61a538d42/taskflow/utility/traits.hpp#L164
template<auto beg, auto end, auto step, typename F>
constexpr bool unroll_until(F&& f) {
  return [&]<auto... Is>(std::index_sequence<Is...>) {
    return (f(beg + Is * step) || ...);
  }(std::make_index_sequence<(end - beg + step - 1) / step>{});
}


template <BoundedWSQLike Q, size_t MaxPriority>
template <typename O> 
bool BoundedPriorityWSQ<Q, MaxPriority>::try_push(O&& item) {
    return unroll_until<0, MaxPriority-1, 1>([&](size_t i){  return _wsqs[i].push(std::item); });
}

In this case, I don't think we even need prioritized_run? Thought?

@pengyk

pengyk commented May 26, 2026

Copy link
Copy Markdown
Author

Hi @tsung-wei-huang ,

Yeah that sounds good! I think this is a cleaner implementation. I am wondering if you have any preferences on the max number of MaxPriority. I think right now it is 3, is that ok or would you want it to be a configurable number?

@tsung-wei-huang

Copy link
Copy Markdown
Member

@pengyk configurable number is definitely better. But we can default it to 3. Also, regarding setting the priority, instead of having a separate _priority member in tf::Node, we may reuse its _nstate like reserving 2 bits for priority so we can save some space (see NSTATEtaskflow/core/error.hpp).

template <BoundedWSQLike Q, size_t MaxPriority = 3>
template <typename O> 
bool BoundedPriorityWSQ<Q, MaxPriority>::try_push(O&& item) {
    return unroll_until<0, MaxPriority-1, 1>([&](size_t i){  return _wsqs[i].push(std::item); });
}

If you have any questions or any part that you think I can jump in or help, please don't hesitate to let me know.

@pengyk

pengyk commented May 26, 2026

Copy link
Copy Markdown
Author

@tsung-wei-huang Yeah that sounds good! I will try to get something out before end of week, thanks for the help! I will let you know if anything question comes up

@pengyk

pengyk commented Jun 1, 2026

Copy link
Copy Markdown
Author

Hi @tsung-wei-huang I think I have made the requested changes, let me know if anything is unclear thanks!

@tsung-wei-huang

Copy link
Copy Markdown
Member

@pengyk thank you - I will have a look soon and get back to you! Before that, I will make a release first to include several other changes on profiler, doc examples, and other scheduler-level micro optimization. Really appreciate your contributions!

@tsung-wei-huang

tsung-wei-huang commented Jun 10, 2026

Copy link
Copy Markdown
Member

Hi @pengyk, I finally go time to look at this pull - thank you! Changes below:

  1. I decouple the originally hard-coded priority() from the BoundedPriorityWSQ by making it a template. This allows applications to more generally return priority from their data type.
  2. I assign the bit state 00 as high, 01 as normal, and 10 as low, consistent with your TaskPriority definition. This makes the priority return more straightforward (i.e., no extra remapping like queue_map = {0, 0, 1, 2}. By default, if users don't assign the priority, it is high, and the scanning in push/pop/steal always go from 0 to MaxPriority-1, naturally.
  3. The biggest change is try_bulk_push which I delegate the push to individual try_bulk_push, rather than counting on executor to post process the return. The modification here performs a linear scan and calls try_bulk_push whenever the current priority is different from the previous one. first is naturally updated by individual try_bulk_push. When no priority is given at all, this function will bulk-push all nodes, at almost no extra cost.
  template <typename I>
  size_t try_bulk_push(I& first, size_t N) { 

    if(N == 0) return 0;

    size_t remaining = N; 

    while(remaining > 0) { 
      size_t q = _priority_fn(first[0]);
      size_t run_len = 1; 
      while(run_len < remaining && _priority_fn(first[run_len]) == q) { 
        ++run_len;
      }    
      size_t inserted = _wsqs[q].try_bulk_push(first, run_len);
      remaining -= inserted;
      if(inserted < run_len) {
        break;
      }    
    }    

    return N - remaining;
  }

With these changes, executor stays most of the same except the _update_cache function, which I slightly refactored it as follows:

  if(cache) {
    if(node->priority() < cache->priority()) {
      std::swap(cache, node);
    }    
    _schedule(worker, node);
  } else {
    cache = node;
  }

Please let me know your thought? Additionally, do you have any better suggestion for the naming BoundedPriorityWSQ? I think we also need unittest for BoundedPriorityWSQ. Would you like to help?

@pengyk

pengyk commented Jun 12, 2026

Copy link
Copy Markdown
Author

Hi @tsung-wei-huang thanks for looking!

The changes make sense to me. As for the renaming, maybe something like BoundedPriorityWSQSet to show that it is a multiple WSQs? or BoundedWSQSet directly?

Yeah I can work on the unit tests for that! Do you want it to be part of this PR or a separate one?

@tsung-wei-huang

tsung-wei-huang commented Jun 12, 2026

Copy link
Copy Markdown
Member

@pengyk Let's make it in this PR for consistency. You can add it to the existing work-stealing unittest file, unittests/test_wsq.cpp. Thank you!

@pengyk

pengyk commented Jun 24, 2026

Copy link
Copy Markdown
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants