I’ve been thinking a lot about continuations and directed graphs the last couple days and different ways to implement them.  I’ve also been looking closer at std::thread, std::future and std::async to ensure I fully understand them.  So I decided I’d make a (small) multi-part series out of this and show some simple sketches of implementations of continuation using the Parallel Pattern Library, std::future and std::async and the Agents Library.  

This is part 1 which will cover a simple implementation of ‘run_when’ and ‘wait_for_all’ using the PPL. 

First a simple dependency / continuation

Here’s an incredibly simple dependency, task 2 depends on task 1.

void ContinueableTask(int in){
       printf("building project: %d\n", in);
};

void SimpleContinuation(){
       ContinueableTask(1);

   //task 2 depends on task 1
   ContinueableTask(2);
}

 

There’s not a lot of concurrency here with only two tasks, yet we can still make this execution ‘parallel’ with a task_group as follows:
void SimpleContinuationTasks(){
   //task1   task_group task1;
   task1.run([](){ContinueableTask(1);});

   //task2 depends on task 1
    task_group task2;
    task2.run([&](){
      task1.wait();
      ContinueableTask(2);
    });
   //wait for task 2
   task2.wait();
}

 

Making it look a bit cleaner

 
The code here isn’t too bad, but is a little awkward.  It would sure be nice to write something like this instead:
—-
void SimpleContinuation(){
    auto task1 = run_task([](){ContinueableTask(1);});
    //task 2 depends on task 1
    auto task2 = run_when(task1, [](){ContinueableTask(2);});
    wait_for_all(task1, task2);
}

 

Implementing this is straightforward in the PPL (in part 2 you’ll see that it’s also straightforward with std::future and std::async and that run_task looks *a lot* like std::async). 

typedef shared_ptr<task_group> shared_task_ptr;

template<typename Func>inline shared_task_ptr run_task(Func& fn){
    shared_ptr<task_group> tasks = make_shared<task_group>();
    tasks->run(fn);
    return tasks;
}

 

I’m using a shared_ptr here because task_group is non-copyable and this allows me to return it by value without having to do anything unnatural in the interface (like create one then pass it in by reference).

run_when is also pretty straightforward, here is a template implementation:

template<typename Func>
inline shared_task_ptr run_when(shared_task_ptr tasks, Func fn){
    tasks->wait();
    return run_task(fn);
}
 
and finally wait_for_all:
 
inline void wait_for_all(shared_task_ptr t1, shared_task_ptr t2){    t1->wait();    t2->wait();}

Expanding this is straightforward

 
It should be pretty easy to see that you can expand run_when and wait_for_all to include overloads for more parameters and that writing an example with more complex dependencies is straightforward:
void MoreComplexContinuations(){    auto p1 = run_task([](){ContinueableTask(1);});    auto p2 = run_task([](){ContinueableTask(2);});    auto p3 = run_task([](){ContinueableTask(3);});    auto p4 = run_when(p1, [](){ContinueableTask(4);});    auto p5 = run_when(p2, p3, [](){ContinueableTask(5);});    auto p6 = run_when(p3, p4, [](){ContinueableTask(6);});    wait_for_all(p1, p2, p3, p4, p5, p6);}

 

Next Time part2: std::async and std::future

In the next few days, I’ll post part 2 and show how easy to implement this pattern in with std::async and std::future.

Advertisements