Modern concurrency for C++. Tasks, executors, timers and C++20 coroutines to rule them all

concurrencpp, the C++ concurrency library

Latest Release License: MIT

concurrencpp is a tasking library for C++ allowing developers to write highly concurrent applications easily and safely by using tasks, executors and coroutines. By using concurrencpp applications can break down big procedures that need to be processed asynchronously into smaller tasks that run concurrently and work in a co-operative manner to achieve the wanted result. concurrencpp also allows applications to write parallel algorithms easily by using parallel coroutines.

concurrencpp main advantages are:

  • Being able to write modern concurrency code without having to rely on low-level concurrency primitives like locks and condition variables.
  • Being able to write highly concurrent and parallel applications that scale automatically to use all hardware resources, as needed.
  • Being able to write non-blocking, synchronous-like code easily by using C++20 coroutines and the co_await keyword.
  • Reducing the possibility of race conditions, data races and deadlocks by using high-level objects with built-in synchronization.
  • concurrencpp provides various types of commonly used executors with a complete coroutine integration.
  • Applications can extend the library by implementing their own provided executors.

Table of contents


concurrencpp overview

concurrencpp is a task-centric library. A task is an asynchronous operation. Tasks offer a higher level of abstraction for concurrent code than traditional thread-centric approaches. Tasks can be chained together, meaning that tasks pass their asynchronous result from one to another, where the result of one task is used as if it were a parameter or an intermediate value of another ongoing task. Tasks allow applications to utilize available hardware resources better and scale much more than using raw threads, since tasks can be suspended, waiting for another task to produce a result, without blocking underlying OS-threads. Tasks bring much more productivity to developers by allowing them to focus more on business-logic and less on low-level concepts like thread management and inter-thread synchronization.

While tasks specify what actions have to be executed, executors are worker-objects that specify where and how to execute tasks. Executors spare applications the managing of thread pools and task queues themselves. Executors also decouple those concepts away from application code, by providing a unified API for creating and scheduling tasks.

Tasks communicate with each other using result objects. A result object is an asynchronous pipe that pass the asynchronous result of one task to another ongoing-task. Results can be awaited and resolved in a non-blocking manner.

These 3 concepts - the task, the executor and the associated result are the building blocks of concurrencpp. Executors run tasks that communicate with each-other by sending results through result-objects. Tasks, executors and result objects work together symbiotically to produce concurrent code which is fast and clean.

concurrencpp is built around the RAII concept. In order to use tasks and executors, applications create a runtime instance in the beginning of the main function. The runtime is then used to acquire existing executors and register new user-defined executors. Executors are used to create and schedule tasks to run, and they might return a result object that can be used to marshal the asynchronous result to another task that acts as its consumer. When the runtime is destroyed, it iterates over every stored executor and calls its shutdown method. Every executor then exits gracefully. Unscheduled tasks are destroyed, and attempts to create new tasks will throw an exception.

"Hello world" program using concurrencpp:

#include "concurrencpp/concurrencpp.h"
#include <iostream>

int main() {
	concurrencpp::runtime runtime;
	auto result = runtime.thread_executor()->submit([] {
		std::cout << "hello world" << std::endl;
	});

	result.get();
	return 0;
}

In this basic example, we created a runtime object, then we acquired the thread executor from the runtime. We used submit to pass a lambda as our given callable. This lambda returns void, hence, the executor returns a result<void> object that marshals the asynchronous result back to the caller. main calls get which blocks the main thread until the result becomes ready. If no exception was thrown, get returns void. If an exception was thrown, get re-throws it. Asynchronously, thread_executor launches a new thread of execution and runs the given lambda. It implicitly co_return void and the task is finished. main is then unblocked.

Concurrent even-number counting:

#include "concurrencpp/concurrencpp.h"

#include <iostream>
#include <vector>
#include <algorithm>

#include <ctime>

using namespace concurrencpp;

std::vector<int> make_random_vector() {
    std::vector<int> vec(64 * 1'024);

    std::srand(std::time(nullptr));
    for (auto& i : vec) {
        i = ::rand();
    }

    return vec;
}

result<size_t> count_even(std::shared_ptr<thread_pool_executor> tpe, const std::vector<int>& vector) {
    const auto vecor_size = vector.size();
    const auto concurrency_level = tpe->max_concurrency_level();
    const auto chunk_size = vecor_size / concurrency_level;

    std::vector<result<size_t>> chunk_count;

    for (auto i = 0; i < concurrency_level; i++) {
        const auto chunk_begin = i * chunk_size;
        const auto chunk_end = chunk_begin + chunk_size;
        auto result = tpe->submit([&vector, chunk_begin, chunk_end]() -> size_t {
            return std::count_if(vector.begin() + chunk_begin, vector.begin() + chunk_end, [](auto i) {
                return i % 2 == 0;
            });
        });

        chunk_count.emplace_back(std::move(result));
    }

    size_t total_count = 0;

    for (auto& result : chunk_count) {
        total_count += co_await result;
    }

    co_return total_count;
}

int main() {
    concurrencpp::runtime runtime;
    const auto vector = make_random_vector();
    auto result = count_even(runtime.thread_pool_executor(), vector);
    const auto total_count = result.get();
    std::cout << "there are " << total_count << " even numbers in the vector" << std::endl;
    return 0;
}

In this example, we start the program by creating a runtime object. We create a vector filled with random numbers, then we acquire the thread_pool_executor from the runtime and call count_even. count_even is a coroutine that spawns more tasks and co_awaits for them to finish inside. max_concurrency_level returns the maximum amount of workers that the executor supports, In the threadpool executor case, the number of workers is calculated from the number of cores. We then partition the array to match the number of workers and send every chunk to be processed in its own task. Asynchronously, the workers count how many even numbers each chunk contains, and co_return the result. count_even sums every result by pulling the count using co_await, the final result is then co_returned. The main thread, which was blocked by calling get is unblocked and the total count is returned. main prints the number of even numbers and the program terminates gracefully.

Tasks

Every big or complex operation can be broken down to smaller and chainable steps. Tasks are asynchronous operations implementing those computational steps. Tasks can run anywhere with the help of executors. While tasks can be created from regular callables (such as functors and lambdas), Tasks are mostly used with coroutines, which allow smooth suspension and resumption. In concurrencpp, the task concept is represented by the concurrencpp::task class. Although the task concept is central to concurrenpp, applications will rarely have to create and manipulate task objects themselves, as task objects are created and scheduled by the runtime with no external help.

concurrencpp coroutines

concurrencpp allows applications to produce and consume coroutines as the main way of creating tasks. concurrencpp coroutines are eager and start to run the moment they are invoked (as opposed to lazy coroutines, which start to run only when co_awaited). concurrencpp coroutines can return any of concurrencpp::result or concurrencpp::null_result.

concurrencpp::result tells the coroutine to marshal the returned value or the thrown exception while concurrencpp::null_result tells the coroutine to drop and ignore any of them.

When a function returns any of concurrencpp::result or concurrencpp::null_resultand contains at least one co_await or co_return in it's body, the function is a concurrencpp coroutine. Every valid concurrencpp coroutine is a valid task. In our count-even example above, count_even is such coroutine. We first spawned count_even, then inside it the threadpool executor spawned more child tasks (that are created from regular callables), that were eventually joined using co_await.

Coroutines can start to run synchronously, in the caller thread. This kind of coroutines is called "regular coroutines". Concurrencpp coroutines can also start to run in parallel, inside a given executor, this kind of coroutines is called "parallel coroutines".

Executors

A concurrencpp executor is an object that is able to schedule and run tasks. Executors simplify the work of managing resources such as threads, thread pools and task queues by decoupling them away from application code. Executors provide a unified way of scheduling and executing tasks, since they all extend concurrencpp::executor.

executor API

class executor {
	/*
		Initializes a new executor and gives it a name.
	*/
	executor(std::string_view name);

	/*
		Destroys this executor.
	*/
	virtual ~executor() noexcept = default;

	/*
		The name of the executor, used for logging and debugging.
	*/
	const std::string name;

	/*
		Schedules a task to run in this executor. 
		Throws concurrencpp::errors::executor_shutdown exception if shutdown was called before.
	*/
	virtual void enqueue(concurrencpp::task task) = 0;

	/*
		Schedules a range of tasks to run in this executor. 
		Throws concurrencpp::errors::executor_shutdown exception if shutdown was called before.
	*/	
	virtual void enqueue(std::span<concurrencpp::task> tasks) = 0;

	/*
		Returns the maximum count of real OS threads this executor supports. 
		The actual count of threads this executor is running might be smaller than this number. 
		returns numeric_limits<int>::max if the executor does not have a limit for OS threads. 
	*/
	virtual int max_concurrency_level() const noexcept = 0;

	/* 
		Returns true if shutdown was called before, false otherwise. 
	*/ 
	virtual bool shutdown_requested() const noexcept = 0;

	/* 
		Shuts down the executor:
		- Tells underlying threads to exit their work loop and joins them.
		- Destroyes unexecuted coroutines.
		- Makes subsequent calls to enqueue, post, submit, bulk_post and 
			bulk_submit to throw concurrencpp::errors::executor_shutdown exception.
		- Makes shutdown_requested return true.
	*/
	virtual void shutdown() noexcept = 0;

	/*
		Turns a callable and its arguments into a task object and schedules it to run in this executor using enqueue.
		Arguments are passed to the task by decaying them first.
 		Throws errors::executor_shutdown exception if shutdown has been called before.
	*/
	template<class callable_type, class ... argument_types>
	void post(callable_type&& callable, argument_types&& ... arguments);
	
	/*
		Like post, but returns a result object that marshals the asynchronous result.
		Throws errors::executor_shutdown exception if shutdown has been called before.
	*/
	template<class callable_type, class ... argument_types>
	result<type> submit(callable_type&& callable, argument_types&& ... arguments);

	/*
		Turns an array of callables into an array of tasks and schedules them to run in this executor using enqueue.
		Throws errors::executor_shutdown exception if shutdown has been called before.
	*/
	template<class callable_type>
	void bulk_post(std::span<callable_type> callable_list);

	/*
		Like bulk_post, but returns an array of result objects that marshal the asynchronous results.
		Throws errors::executor_shutdown exception if shutdown has been called before. 
	*/	
	template<class callable_type>
	std::vector<concurrencpp::result<type>> bulk_submit(std::span<callable_type> callable_list);
};

Executor types

As mentioned above, concurrencpp provides commonly used executors. These executor types are:

  • thread pool executor - a general purpose executor that maintains a pool of threads. The thread pool executor is suitable for short cpu-bound tasks that don't block. Applications are encouraged to use this executor as the default executor for non-blocking tasks. The concurrencpp thread pool provides dynamic thread injection and dynamic work balancing.

  • blocking executor - a threadpool executor with a larger pool of threads. Suitable for launching short blocking tasks like file io and db queries.

  • thread executor - an executor that launches each enqueued task to run on a new thread of execution. Threads are not reused. This executor is good for long running tasks, like objects that run a work loop, or long blocking operations.

  • worker thread executor - a single thread executor that maintains a single task queue. Suitable when applications want a dedicated thread that executes many related tasks.

  • manual executor - an executor that does not execute coroutines by itself. Application code can execute previously enqueued tasks by manually invoking its execution methods.

  • derivable executor - a base class for user defined executors. Although inheriting directly from concurrencpp::executor is possible, derivable_executor uses the CRTP pattern that provides some optimization opportunities for the compiler.

  • inline executor - mainly used to override the behavior of other executors. Enqueuing a task is equivalent to invoking it inline.

Using executors

The bare mechanism of an executor is encapsulated in its enqueue method. This method enqueues a task for execution and has two overloads: One overload receives a single task object as an argument, and another that receives a span of task objects. The second overload is used to enqueue a batch of tasks. This allows better scheduling heuristics and decreased contention.

Applications don't have to rely on enqueue alone, concurrencpp::executor provides an API for scheduling user callables by converting them to task objects behind the scenes. Applications can request executors to return a result object that marshals the asynchronous result of the provided callable. This is done by calling executor::submit and execuor::bulk_submit. submit gets a callable, and returns a result object. executor::bulk_submit gets a span of callables and returns a vectorof result objects in a similar way submit works. In many cases, applications are not interested in the asynchronous value or exception. In this case, applications can use executor:::post and executor::bulk_post to schedule a callable or a span of callables to be executed, but also tells the task to drop any returned value or thrown exception. Not marshaling the asynchronous result is faster than marshaling, but then we have no way of knowing the status or the result of the ongoing task.

post, bulk_post, submit and bulk_submit use enqueue behind the scenes for the underlying scheduling mechanism.

Result objects

Asynchronous values and exceptions can be consumed using the concurrencpp result objects. A result object is a pipe for the asynchronous result, like std::future. When a task finishes execution, it either returns a valid value or throws an exception. In either case, this asynchronous result is marshaled to the consumer of the result object. The result status therefore, vary from idle (the asynchronous result or exception aren't ready yet) to value (the task terminated by returning a valid value) to exception (the task terminated by throwing an exception).

Result objects are a move-only type, and as such, they cannot be used after their content was moved to another result object. In this case, the result object is considered to be empty and attempts to call any method other than operator bool and operator = will throw. After the asynchronous result has been pulled out of the result object (by calling get, await or await_via), the result object becomes empty. Emptiness can be tested with operator bool.

Results can be polled, waited, awaited or resolved.

Result objects can be polled for their status by calling result::status.

Results can be waited by calling any of result::wait, result::wait_for, result::wait_until or result::get. Waiting a result is a blocking operation (in the case the asynchronous result is not ready), and will suspend the entire thread of execution waiting for the asynchronous result to become available. Waiting operations are generally discouraged and only allowed in root-level tasks or in contexts which allow it, like blocking the main thread waiting for the rest of the application to finish gracefully, or using concurrencpp::blocking_executor or concurrencpp::thread_executor.

Awaiting a result means to suspend the current coroutine until the asynchronous result is ready. If a valid value was returned from the associated task, it is returend from the result object. If the associated task threw an exception, it is re-thrown. At the moment of awaiting, if the result is already ready, the current coroutine resumes immediately. Otherwise, it is resumed by the thread that sets the asynchronous result or exception.

The behavior of awaiting result objects can be further fine tuned by using await_via. This method accepts an executor and a boolean flag (force_rescheduling). If, at the moment of awaiting, the result is already ready, the behavior depends on the value of force_rescheduling. If force_rescheduling is true, the current coroutine is forcefully suspended and resumed inside the given executor. If force_rescheduling is false, the current coroutine is resumed immediately in the calling thread. If the asynchronous result is not ready at the moment of awaiting, the current coroutine resumed after the result is set, by scheduling it to run in the given exector.

Resolving a result is similar to awaiting it. The different is that the co_await expression will return the result object itself, in a non empty form, in a ready state. The asynchronous result can then be pulled by using get or co_await. Just like await_via, resolve_via fine tunes the control flow of the coroutine by passing an executor and a flag suggesting how to behave when the result is already ready.

Awaiting a result object by using co_await (and by doing so, turning the current function/task into a coroutine as well) is the preferred way of consuming result objects, as it does not block underlying threads.

result API

class result{
	/*
		Creates an empty result that isn't associated with any task.
	*/
	result() noexcept = default;

	/*
		Destroyes the result. Associated tasks are not cancelled.
		The destructor does not block waiting for the asynchronous result to become ready.
	*/	
	~result() noexcept = default;

	/*
		Moves the content of rhs to *this. After this call, rhs is empty. 
	*/
	result(result&& rhs) noexcept = default;

	/*
		Moves the content of rhs to *this. After this call, rhs is empty. Returns *this.		
	*/
	result& operator = (result&& rhs) noexcept = default;

	/*
		Returns true if this is a non-empty result.
		Applications must not use this object if this->operator bool() is false. 
	*/
	operator bool() const noexcept;

	/*
		Queries the status of *this.
		The return value is any of result_status::idle, result_status::value or result_status::exception.
		Throws concurrencpp::errors::empty_result if *this is empty.		
	*/
	result_status status() const;

	/*
		Blocks the current thread of execution until this result is ready, when status() != result_status::idle.
		Throws concurrencpp::errors::empty_result if *this is empty.					
	*/
	void wait();

	/*
		Blocks until this result is ready or duration has passed. Returns the status of this result after unblocking.
		Throws concurrencpp::errors::empty_result if *this is empty.					
	*/
	template<class duration_unit, class ratio>
	result_status wait_for(std::chrono::duration<duration_unit, ratio> duration);

	/*
		Blocks until this result is ready or timeout_time has reached. Returns the status of this result after unblocking.
		Throws concurrencpp::errors::empty_result if *this is empty.					
	*/
	template< class clock, class duration >
	result_status wait_until(std::chrono::time_point<clock, duration> timeout_time);

	/*
		Blocks the current thread of execution until this result is ready, when status() != result_status::idle.
		If the result is a valid value, it is returned, otherwise, get rethrows the asynchronous exception.		
		Throws concurrencpp::errors::empty_result if *this is empty.					
	*/
	type get();

	/*
		Returns an awaitable used to await this result.
		If the result is already ready - the current coroutine resumes immediately in the calling thread of execution.
		If the result is not ready yet, the current coroutine is suspended and resumed when the asynchronous result is ready,
		by the thread which had set the asynchronous value or exception.
		In either way, after resuming, if the result is a valid value, it is returned. 
		Otherwise, operator co_await rethrows the asynchronous exception.
		Throws concurrencpp::errors::empty_result if *this is empty.							
	*/
	auto operator co_await();

	/*
		Returns an awaitable used to await this result.
		If the result is not ready yet, the current coroutine is suspended and resumed when the asynchronous result is ready,
		by scheduling the current coroutine via executor.
		If the result is already ready - the behaviour depends on the value of force_rescheduling:
			If force_rescheduling = true, then the current coroutine is forcefully suspended and resumed via executor.
			If force_rescheduling = false, then the current coroutine resumes immediately in the calling thread of execution.
		In either way, after resuming, if the result is a valid value, it is returned. 
		Otherwise, operator co_await rethrows the asynchronous exception.
		Throws concurrencpp::errors::empty_result if *this is empty.		
		Throws std::invalid_argument if executor is null.
		If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.	
	*/
	auto await_via(
		std::shared_ptr<concurrencpp::executor> executor,
		bool force_rescheduling = true);

	/*
		Returns an awaitable used to resolve this result.
		After co_await expression finishes, *this is returned in a non-empty form, in a ready state.
		Throws concurrencpp::errors::empty_result if *this is empty.
	*/	
	auto resolve();

	/*
		Returns an awaitable used to resolve this result.
		If the result is not ready yet, the current coroutine is suspended and resumed when the asynchronous result is ready,
		by scheduling the current coroutine via executor.
		If the result is already ready - the behaviour depends on the value of force_rescheduling:
			If force_rescheduling = true, then the current coroutine is forcefully suspended and resumed via executor.
			If force_rescheduling = false, then the current coroutine resumes immediately in the calling thread of execution.
		In either way, after resuming, *this is returned in a non-empty form and guaranteed that its status is not result_status::idle.
		Throws concurrencpp::errors::empty_result if *this is empty.		
		Throws std::invalid_argument if executor is null.
		If this result is ready and force_rescheduling=true, throws any exception that executor::enqueue may throw.					
	*/
	auto resolve_via(
		std::shared_ptr<concurrencpp::executor> executor,
		bool force_rescheduling = true);
};

Parallel coroutines

Regular coroutines start to run synchronously in the calling thread of execution. Execution might shift to another thread of execution if the coroutine undergoes a rescheduling, for example by awaiting an unready result object inside it. concurrencpp also provide parallel coroutines, which start to run inside a given executor, not in the invoking thread of execution. This style of scheduling coroutines is especially helpful when writing parallel algorithms, recursive algorithms and concurrent algorithms that use the fork-join model.

Every parallel coroutine must meet the following preconditions:

  1. Returns any of result / null_result .
  2. Gets executor_tag as its first argument .
  3. Gets any of type* / type& / std::shared_ptr<type>, where type is a concrete class of executor as its second argument.
  4. Contains any of co_await or co_return in its body.

If all the above applies, the function is a parallel coroutine: concurrencpp will start the coroutine suspended and immediately re-schedule it to run in the provided executor. concurrencpp::executor_tag is a dummy placeholder to tell the concurrencpp runtime that this function is not a regular function, it needs to start running inside the given executor. Applications can then consume the result of the parallel coroutine by using the returned result object.

Parallel Fibonacci example:

#include "concurrencpp/concurrencpp.h"
#include <iostream>

using namespace concurrencpp;

int fibonacci_sync(int i) {
	if (i == 0) {
		return 0;
	}

	if (i == 1) {
		return 1;
	}

	return fibonacci_sync(i - 1) + fibonacci_sync(i - 2);
}

result<int> fibonacci(executor_tag, std::shared_ptr<thread_pool_executor> tpe, const int curr) {
	if (curr <= 10) {
		co_return fibonacci_sync(curr);
	}

	auto fib_1 = fibonacci({}, tpe, curr - 1);
	auto fib_2 = fibonacci({}, tpe, curr - 2);

	co_return co_await fib_1 + co_await fib_2;
}

int main() {
	concurrencpp::runtime runtime;
	auto fibb_30 = fibonacci({}, runtime.thread_pool_executor(), 30).get();
	std::cout << "fibonacci(30) = " << fibb_30 << std::endl;
	return 0;
}

In this example, we calculate the 30-th member of the Fibonacci sequence in a parallel manner. We start launching each Fibonacci step in its own parallel coroutine. The first argument is a dummy executor_tag and the second argument is the threadpool executor. Every recursive step invokes a new parallel coroutine that runs in parallel. Each result is co_returned to its parent task and acquired by using co_await.
When we deem the input to be small enough to be calculated synchronously (when curr <= 10), we stop executing each recursive step in its own task and just solve the algorithm synchronously.

To compare, this is how the same code is written without using parallel coroutines, and relying on exector::submit alone. Since fibonacci returns a result<int>, submitting it recursively via executor::submit will result a result<result<int>>.

#include "concurrencpp/concurrencpp.h"
#include <iostream>

using namespace concurrencpp;

int fibonacci_sync(int i) {
    if (i == 0) {
        return 0;
    }

    if (i == 1) {
        return 1;
    }

    return fibonacci_sync(i - 1) + fibonacci_sync(i - 2);
}

result<int> fibonacci(std::shared_ptr<thread_pool_executor> tpe, const int curr) {
    if (curr <= 10) {
        co_return fibonacci_sync(curr);
    }

    auto fib_1 = tpe->submit(fibonacci, tpe, curr - 1);
    auto fib_2 = tpe->submit(fibonacci, tpe, curr - 2);

    co_return
	co_await co_await fib_1 +
        co_await co_await fib_2;
}

int main() {
    concurrencpp::runtime runtime;
    auto fibb_30 = fibonacci(runtime.thread_pool_executor(), 30).get();
    std::cout << "fibonacci(30) = " << fibb_30 << std::endl;
    return 0;
}

Result-promises

Result objects are the main way to pass data between tasks in concurrencpp and we've seen how executors and coroutines produce such objects. Sometimes we want to use the capabilities of a result object with non-tasks, for example when using a third-party library. In this case, we can complete a result object by using a result_promise. result_promise resembles a std::promise object - applications can manually set the asynchronous result or exception and make the associated result object become ready.

Just like result objects, result-promises are a move only type that becomes empty after move. Similarly, after setting a result or an exception, the result promise becomes empty as well. If a result-promise gets out of scope and no result/exception has been set, the result-promise destructor sets a concurrencpp::errors::broken_task exception using the set_exception method. Suspended and blocked tasks waiting for the associated result object are resumed/unblocked.

Result promises can convert callback style of code into async/await style of code: whenever a component requires a callback to marshal the asynchronous result, we can pass a callback that calls set_result or set_exception (depending on the asynchronous result itself) on the passed result promise, and return the associated result.

result_promise API

template <class type>
class result_promise {	
	/*
		Constructs a valid result_promise.
	*/
	result_promise();

	/*
		Moves the content of rhs to *this. After this call, rhs is empty.
	*/		
	result_promise(result_promise&& rhs) noexcept;

	/*
		Destroys *this, possibly setting a concurrencpp::errors::broken_task exception
		by calling set_exception if *this is not empty at the time of destruction.
	*/		
	~result_promise() noexcept;

	/*
		Moves the content of rhs to *this. After this call, rhs is empty. 
	*/		
	result_promise& operator = (result_promise&& rhs) noexcept;

	/*
		Returns true if this is a non-empty result-promise.
		Applications must not use this object if this->operator bool() is false. 
	*/
	explicit operator bool() const noexcept;

	/*
		Sets a value by constructing <<type>> from arguments... in-place. 
		Makes the associated result object become ready - tasks waiting for it to become ready are unblocked. 
		Suspended tasks are resumed either inline or via the executor that was provided by calling result::await_via or result::resolve_via.
		After this call, *this becomes empty.
		If *this is empty, a concurrencpp::errors::empty_result_promise exception is thrown.
	*/
	template<class ... argument_types>
	void set_result(argument_types&& ... arguments);
	
	/*
		Sets an exception.
		Makes the associated result object become ready - tasks waiting for it to become ready are unblocked.
		Suspended tasks are resumed either inline or via the executor that was provided by calling result::await_via or result::resolve_via.
		After this call, *this becomes empty.
		If *this is empty, a concurrencpp::errors::empty_result_promise exception is thrown.
		If exception_ptr is null, an std::invalid_argument exception is thrown.
	*/
	void set_exception(std::exception_ptr exception_ptr);

	/*
		A convenience method that invokes a callable with arguments... and calls set_result with the result of the invocation. 
		If an exception is thrown, the thrown exception is caught and set instead by calling set_exception.
		After this call, *this becomes empty.
		If *this is empty, a concurrencpp::errors::empty_result_promise exception is thrown.			
	*/
	template<class callable_type, class ... argument_types>
	void set_from_function(callable_type&& callable, argument_types&& ... arguments);
	
	/*
		Gets the associated result object. 
		If *this is empty, a concurrencpp::errors::empty_result_promise exception is thrown.
		If this method had been called before, a concurrencpp::errors::result_already_retrieved exception is thrown.
	*/
	result<type> get_result();
};

Example: Marshaling asynchronous result using result_promise:

#include "concurrencpp/concurrencpp.h"

#include <iostream>

int main() {
	concurrencpp::result_promise<std::string> promise;
	auto result = promise.get_result();

	std::thread my_3_party_executor([promise = std::move(promise)] () mutable {
		std::this_thread::sleep_for(std::chrono::seconds(1)); //Imitate real work 
		promise.set_result("hello world");
	});

	auto asynchronous_string = result.get();
	std::cout << "result promise returned string: " << asynchronous_string << std::endl;

	my_3_party_executor.join();
}

In this example, We use std::thread as a third-party executor. This represents a scenario when a non-concurrencpp executor is used as part of the application life-cycle. We extract the result object before we pass the promise and block the main thread until the result becomes ready. In my_3_party_executor, we set a result as if we co_returned it.

Summery: using tasks and coroutines

A task is an asynchronous operation implementing an asynchronous computational step. Tasks are created by using one of the executor methods or by invoking a concurrencpp coroutine. Tasks might return a result object that is used to consume the asynchronous value or exception the task had produced. When used correctly, result objects don't block, this way we can chain tasks together, creating a bigger, asynchronous flow graph that never blocks.

A concurrencpp coroutine is a C++ suspendable function. It is eager, meaning it starts to run the moment it is invoked. It returns one of concurrencpp::result / concurrencpp::null_result and contains any of co_await or co_return in its body. Parallel coroutines are a special kind of coroutines, that start run in another thread, by passing a concurrencpp::executor_tag and an instance of a valid concurrencpp executor as the first arguments.

Result auxiliary functions

For completeness, concurrencpp provides helper functions that help manipulate result objects:

/*
	Creates a ready result object by building <<type>> from arguments&&... in-place.
*/
template<class type, class ... argument_types>
result<type> make_ready_result(argument_types&& ... arguments);

/*
	An overload for void type. 
*/
result<void> make_ready_result();

/*
	Creates a ready result object from an exception pointer.
	The returned result object will re-throw exception_ptr when calling get, await or await_via.
	Throws std::invalid_argument if exception_ptr is null.
*/
template<class type>
result<type> make_exceptional_result(std::exception_ptr exception_ptr);

/*
	Overload. Similar to make_exceptional_result(std::exception_ptr),
	but gets an exception object directly.
*/
template<class type, class exception_type>
result<type> make_exceptional_result(exception_type exception);
 
/*
	Creates a result object that becomes ready when all the input results become ready. 
	Passed result objects are emptied and returned as a tuple.
	Throws std::invalid_argument if any of the passed result objects is empty.
*/
template<class ... result_types>
result<std::tuple<typename std::decay<result_types>::type...>>
   when_all(result_types&& ... results);

/*
	Overload. Similar to when_all(result_types&& ...) but receives a pair of iterators referencing a range. 
	Passed result objects are emptied and returned as a vector.
	If begin == end, the function returns immediately with an empty vector.
	Throws std::invalid_argument if any of the passed result objects is empty.
*/
template<class iterator_type>
result<std::vector<typename std::iterator_traits<iterator_type>::value_type>> 
   when_all(iterator_type begin, iterator_type end);

/*
	Overload. Returns a ready result object that doesn't monitor any asynchronous result.
*/
result<std::tuple<>> when_all();

/*
	Helper struct returned from when_any.
	index is the position of the ready result in results sequence.
	results is either an std::tuple or an std::vector of the results that were passed to when_any.
*/
template <class sequence_type>
struct when_any_result {
	std::size_t index;
	sequence_type results;
};

/*
	Creates a result object that becomes ready when at least one of the input results is ready.
	Passed result objects are emptied and returned as a tuple.
	Throws std::invalid_argument if any of the passed result objects is empty.
*/
template<class ... result_types>
result<when_any_result<std::tuple<result_types...>>>
   when_any(result_types&& ... results);

/*
	Overload. Similar to when_any(result_types&& ...) but receives a pair of iterators referencing a range.
	Passed result objects are emptied and returned as a vector.
	Throws std::invalid_argument if begin == end.
	Throws std::invalid_argument if any of the passed result objects is empty.
*/
template<class iterator_type>
result<when_any_result<std::vector<typename std::iterator_traits<iterator_type>::value_type>>>
   when_any(iterator_type begin, iterator_type end);

Timers and Timer queues

concurrencpp also provides timers and timer queues. Timers are objects that define asynchronous actions running on an executor within a well-defined interval of time. There are three types of timers - regular timers, onshot-timers and delay objects.

Regular timers have four properties that define them:

  1. Callable - a callable that will be scheduled to run as a task periodically.
  2. Executor - an executor that schedules the callable to run periodically.
  3. Due time - from the time of creation, the interval in milliseconds the timer will be scheduled to run for the first time.
  4. Frequency - from the time the timer was scheduled to run for the first time, the interval in milliseconds the callable will be schedule to run periodically, until the timer is destructed or cancelled.

Like other objects in concurrencpp, timers are a move only type that can be empty. When a timer is destructed or timer::cancel is called, the timer cancels its scheduled but not yet executed tasks. Ongoing tasks are uneffected. The timer callable must be thread safe. It is recommended to set the due time and the frequency of a timer to a granularity of 50 milliseconds.

A timer queue is a concurrencpp worker that manages a collection of timers and processes them in just one thread of execution. It is also the agent used to create new timers. When a timer deadline (whether it is the timer's due-time or frequency) has reached, the timer queue "fires" the timer by scheduling its callable to run on the associated executor as a task.

Just like executors, timer queues also adhere to the RAII concpet. When the runtime object gets out of scope, It shuts down the timer queue, cancelling all pending timers. After a timer queue has been shut down, any subsequent call to make_timer, make_onshot_timer and make_delay_object will throw an errors::timer_queue_shutdown exception. Applications must not try to shut down timer queues by themselves.

timer_queue API:

class timer_queue {
	/*
		Destroyes this timer_queue.
	*/
	~timer_queue() noexcept;
	
	/*
		Shuts down this timer_queue:
		Tells the underlying thread of execution to quit and joins it.
		Cancells all pending timers.
		After this call, invocation of any method besides shutdown and shutdown_requested will throw an errors::timer_queue_shutdown.
		If shutdown had been called before, this method has no effect.
	*/
	void shutdown() noexcept;

	/*
		Returns true if shutdown had been called before, false otherwise.
	*/
	bool shutdown_requested() const noexcept;

	/*
		Creates a new running timer where *this is associated timer_queue.
		Throws std::invalid_argument if executor is null.
		Throws errors::timer_queue_shutdown if shutdown had been called before.
	*/
	template<class callable_type, class ... argumet_types>
	timer make_timer(
		std::chrono::milliseconds due_time,
		std::chrono::milliseconds frequency,
		std::shared_ptr<concurrencpp::executor> executor,
		callable_type&& callable,
		argumet_types&& ... arguments);

	/*
		Creates a new one-shot timer where *this is associated timer_queue.
		Throws std::invalid_argument if executor is null.
		Throws errors::timer_queue_shutdown if shutdown had been called before.
	*/
	template<class callable_type, class ... argumet_types>
	timer make_one_shot_timer(
		std::chrono::milliseconds due_time,
		std::shared_ptr<concurrencpp::executor> executor,
		callable_type&& callable,
		argumet_types&& ... arguments);

	/*
		Creates a new delay object where *this is associated timer_queue.
		Throws std::invalid_argument if executor is null.
		Throws errors::timer_queue_shutdown if shutdown had been called before.
	*/
	result<void> make_delay_object(
		std::chrono::milliseconds due_time,
		std::shared_ptr<concurrencpp::executor> executor);
};

timer API:

class timer {
	/*
		Creates an empty timer.
	*/
	timer() noexcept = default;

	/*
		Cancels the timer, if not empty.
	*/
	~timer() noexcept;

	/*
		Moves the content of rhs to *this.
		rhs is empty after this call.
	*/
	timer(timer&& rhs) noexcept = default;

	/*
		Moves the content of rhs to *this.
		rhs is empty after this call.
		Returns *this.
	*/
	timer& operator = (timer&& rhs) noexcept;

	/*
		Cancels this timer.
		After this call, the associated timer_queue will not schedule *this to run again and *this becomes empty.
		Scheduled, but not yet executed tasks are cancelled. 
		Ongoing tasks are uneffected.
		This method has no effect if *this is empty or the associated timer_queue has already expired.
	*/
	void cancel();

	/*
		Returns the associated executor of this timer.	
		Throws concurrencpp::errors::empty_timer is *this is empty.
	*/
	std::shared_ptr<executor> get_executor() const;

	/*
		Returns the associated timer_queue of this timer.
		Throws concurrencpp::errors::empty_timer is *this is empty.
	*/
	std::weak_ptr<timer_queue> get_timer_queue() const;

	/*
		Returns the due time of this timer.
		Throws concurrencpp::errors::empty_timer is *this is empty.
	*/
	std::chrono::milliseconds get_due_time() const;

	/*
		Returns the frequency of this timer.	
		Throws concurrencpp::errors::empty_timer is *this is empty.
	*/
	std::chrono::milliseconds get_frequency() const;

	/*
		Sets new frequency for this timer.
		Callables already scheduled to run at the time of invocation are not affected.	
		Throws concurrencpp::errors::empty_timer is *this is empty.
	*/
	void set_frequency(std::chrono::milliseconds new_frequency);

	/*
		Returns true is *this is not an empty timer, false otherwise.
		The timer should not be used if this->operator bool() is false.
	*/
	operator bool() const noexcept;
};

Regular timer example:

#include "concurrencpp/concurrencpp.h"

#include <iostream>

using namespace std::chrono_literals;

int main() {
	concurrencpp::runtime runtime;
	std::atomic_size_t counter = 1;
	concurrencpp::timer timer = runtime.timer_queue()->make_timer(
		1500ms,
		2000ms,
		runtime.thread_pool_executor(),
		[&] {
			const auto c = counter.fetch_add(1);
			std::cout << "timer was invoked for the " << c << "th time" << std::endl;
		});

	std::this_thread::sleep_for(12s);
	return 0;
}

In this example we create a regular timer by using the timer queue. The timer schedules its callable after 1.5 seconds, then fires its callable every 2 seconds. The given callable runs in the threadpool executor.

Oneshot timers

A oneshot timer is a one-time timer with only a due time - after it schedules its callable to run once it never reschedules it to run again.

Oneshot timer example:

#include "concurrencpp/concurrencpp.h"

#include <iostream>

using namespace std::chrono_literals;

int main() {
	concurrencpp::runtime runtime;
	concurrencpp::timer timer = runtime.timer_queue()->make_one_shot_timer(
		3000ms,
		runtime.thread_executor(),
		[&] {
			std::cout << "hello and goodbye" << std::endl;
		});

	std::this_thread::sleep_for(4s);
	return 0;
}

In this example, we create a timer that runs only once - after 3 seconds from its creation, the timer will schedule to run its callable on a new thread of execution (using concurrencpp::thread_executor).

Delay objects

A delay object is a result object that becomes ready when its due time is reached. Applications can co_await this result object to delay the current coroutine in a non-blocking way. The current coroutine is resumed by the executor that was passed to make_delay_object.

Delay object example:

#include "concurrencpp/concurrencpp.h"

#include <iostream>

using namespace std::chrono_literals;

concurrencpp::null_result delayed_task(
	std::shared_ptr<concurrencpp::timer_queue> tq,
	std::shared_ptr<concurrencpp::thread_pool_executor> ex) {
	size_t counter = 1;

	while(true) {
		std::cout << "task was invoked " << counter << " times." << std::endl;
		counter++;

		co_await tq->make_delay_object(1500ms, ex);
	}
}

int main() {
	concurrencpp::runtime runtime;
	delayed_task(runtime.timer_queue(), runtime.thread_pool_executor());

	std::this_thread::sleep_for(10s);
	return 0;
}

In this example, we created a coroutine (that does not marshal any result or thrown exception), which delays itself in a loop by calling co_await on a delay object.

The runtime object

The concurrencpp runtime object is the agent used to acquire, store and create new executors.
The runtime must be created as a value type as soon as the main function starts to run. When the concurrencpp runtime gets out of scope, it iterates over its stored executors and shuts them down one by one by calling executor::shutdown. Executors then exit their inner work loop and any subsequent attempt to schedule a new task will throw a concurrencpp::executor_shutdown exception. The runtime also contains the global timer queue used to create timers and delay objects. Upon destruction, stored executors will destroy unexecuted tasks, and wait for ongoing tasks to finish. If an ongoing task tries to use an executor to spawn new tasks or schedule its own task continuation - an exception will be thrown. In this case, ongoing tasks need to quit as soon as possible, allowing their underlying executors to quit. The timer queue will also be shut down, cancelling all running timers. With this RAII style of code, no tasks can be processed before the creation of the runtime object, and while/after the runtime gets out of scope. This frees concurrent applications from needing to communicate termination messages explicitly. Tasks are free use executors as long as the runtime object is alive.

runtime API

class runtime {
	/*
		Creates a runtime object with default options.	
	*/
	runtime();

	/*
		Creates a runtime object with user defined options.
	*/
	runtime(const concurrencpp::runtime_options& options);

	/*
		Destroys this runtime object. 
		Calls executor::shutdown on each monitored executor.
		Calls timer_queue::shutdown on the global timer queue.
	*/
	~runtime() noexcept;

	/*
		Returns this runtime timer queue used to create new times.
	*/
	std::shared_ptr<concurrencpp::timer_queue> timer_queue() const noexcept;

	/*
		Returns this runtime concurrencpp::inline_executor
	*/
	std::shared_ptr<concurrencpp::inline_executor> inline_executor() const noexcept;

	/*
		Returns this runtime concurrencpp::thread_pool_executor
	*/
	std::shared_ptr<concurrencpp::thread_pool_executor> thread_pool_executor() const noexcept;

	/*
		Returns this runtime concurrencpp::background_executor
	*/
	std::shared_ptr<concurrencpp::thread_pool_executor> background_executor() const noexcept;

	/*
		Returns this runtime concurrencpp::thread_executor
	*/
	std::shared_ptr<concurrencpp::thread_executor> thread_executor() const noexcept;

	/*
		Creates a new concurrencpp::worker_thread_executor and registers it in this runtime.
		Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
	*/
	std::shared_ptr<concurrencpp::worker_thread_executor> make_worker_thread_executor();

	/*
		Creates a new concurrencpp::manual_executor and registers it in this runtime.
		Might throw std::bad_alloc or std::system_error if any underlying memory or system resource could not have been acquired.
	*/
	std::shared_ptr<concurrencpp::manual_executor> make_manual_executor();

	/*
		Creates a new user defined executor and registers it in this runtime.
		executor_type must be a valid concrete class of concurrencpp::executor.
		Might throw std::bad_alloc if no memory is available.
		Might throw any exception that the constructor of <<executor_type>> might throw. 
	*/
	template<class executor_type, class ... argument_types>
	std::shared_ptr<executor_type> make_executor(argument_types&& ... arguments);

	/*
		returns the version of concurrencpp that the library was built with.
	*/
	static std::tuple<unsigned int, unsigned int, unsigned int> version() noexcept;
};

Creating user-defined executors

As mentioned before, Applications can create their own custom executor type by inheriting the derivable_executor class. There are a few points to consider when implementing user defined executors: The most important thing is to remember that executors are used from multiple threads, so implemented methods must be thread-safe.

New executors can be created using runtime::make_executor. Applications must not create new executors with plain instantiation (such as std::make_shared or plain new), only by using runtime::make_executor. Also, applications must not try to re-instantiate the built-in concurrencpp executors, like the thread_pool_executor or the thread_executor, those executors must only be accessed through their existing instance in the runtime object.

Another important point is to handle shutdown correctly: shutdown, shutdown_requested and enqueue should all monitor the executor state and behave accordingly when invoked:

  • shutdown should tell underlying threads to quit and then join them.
  • shutdown might be called multiple times, and the method must handle this scenario by ignoring any subsequent call to shutdown after the first invocation.
  • enqueue must throw a concurrencpp::errors::executor_shutdown exception if shutdown had been called before.

Implementing an executor is one of the rare cases applications need to work with concurrencpp::task class directly. concurrencpp::task is a std::function like object, but with a few differences. Like std::function, the task object stores a callable that acts as the asynchronous operation. Unlike std::function, concurrencpp::task is a move only type. On invocation, task objects receive no parameters and return void. Moreover, every task object can be invoked only once. After the first invocation, the task object becomes empty. Invoking an empty task object is equivalent of invoking an empty lambda ([]{}), and will not throw any exception. Task objects receive their callable as a forwarding reference (type&& where type is a template parameter), and not by copy (like std::function). Construction of the stored callable happens in-place. This allows task objects to contain callables that are move-only type (like std::unique_ptr and concurrencpp::result). Task objects try to use different methods to optimize the usage of the stored types. Task objects apply the short-buffer-optimization (sbo) for regular, small callables, and will inline calls to std::coroutine_handle<void> by calling them directly without virtual dispatch.

task API

  class task {
	/*
		Creates an empty task object.
	*/
        task() noexcept;
        
	/*
		Creates a task object by moving the stored callable of rhs to *this.
	        If rhs is empty, then *this will also be empty after construction.
	        After this call, rhs is empty.
        */
        task(task&& rhs) noexcept;

	/*
		Creates a task object by storing callable in *this.
		<<typename std::decay<callable_type>::type>> will be in-place-
		constructed inside *this by perfect forwarding callable.
	*/
        template<class callable_type>
        task(callable_type&& callable);

	/*
		Destroyes stored callable, does nothing if empty.
	*/
        ~task() noexcept;

        task(const task& rhs) = delete;
        task& operator=(const task&& rhs) = delete;

	/*
		If *this is empty, does nothing.
		Invokes stored callable, and immediately destroys it.
		After this call, *this is empty.
		May throw any exception that the invoked callable may throw.
	*/
        void operator()();

	/*
		Moves the stored callable of rhs to *this.
		If rhs is empty, then *this will also be empty after this call.	
		If *this already contains a stored callable, operator = destroys it first.
	*/
        task& operator=(task&& rhs) noexcept;

	/*
		If *this is not empty, task::clear destroys the stored callable and empties *this.
		If *this is empty, clear does nothing.
	*/
        void clear() noexcept;

	/*
		Returns true if *this stores a callable. false otherwise.
	*/
        operator bool() const noexcept;

	/*
		Returns true if *this stores a callable,
		and that stored callable has the same type as <<typename std::decay<callable_type>::type>>  
	*/
        template<class callable_type>
        bool contains() const noexcept;
    };

When implementing user-defined executors, it is up to the implementation to store tasks (when enqueue is called), and execute them according to the executor inner-mechanism.

Example: using a user-defined executor:

#include "concurrencpp/concurrencpp.h"

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

class logging_executor : public concurrencpp::derivable_executor<logging_executor> {

private:
	mutable std::mutex _lock;
	std::queue<concurrencpp::task> _queue;
	std::condition_variable _condition;
	bool _shutdown_requested;
	std::thread _thread;
	const std::string _prefix;

	void work_loop() {
		while (true) {
			std::unique_lock<std::mutex> lock(_lock);
			if (_shutdown_requested) {
				return;
			}

			if (!_queue.empty()) {
				auto task = std::move(_queue.front());
				_queue.pop();
				lock.unlock();
				std::cout << _prefix << " A task is being executed" << std::endl;
				task();
				continue;
			}

			_condition.wait(lock, [this] {
				return !_queue.empty() || _shutdown_requested;
			});
		}
	}

public:
	logging_executor(std::string_view prefix) :
		derivable_executor<logging_executor>("logging_executor"),
		_shutdown_requested(false),
		_prefix(prefix) {
		_thread = std::thread([this] {
			work_loop();
		});
	}

	void enqueue(concurrencpp::task task) override {
		std::cout << _prefix << " A task is being enqueued!" << std::endl;

		std::unique_lock<std::mutex> lock(_lock);
		if (_shutdown_requested) {
			throw concurrencpp::errors::executor_shutdown("logging executor - executor was shutdown.");
		}

		_queue.emplace(std::move(task));
		_condition.notify_one();
	}

	void enqueue(std::span<concurrencpp::task> tasks) override {
		std::cout << _prefix << tasks.size() << " tasks are being enqueued!" << std::endl;

		std::unique_lock<std::mutex> lock(_lock);
		if (_shutdown_requested) {
			throw concurrencpp::errors::executor_shutdown("logging executor - executor was shutdown.");
		}

		for (auto& task : tasks) {
			_queue.emplace(std::move(task));
		}

		_condition.notify_one();
	}

	int max_concurrency_level() const noexcept override {
		return 1;
	}

	bool shutdown_requested() const noexcept override {
		std::unique_lock<std::mutex> lock(_lock);
		return _shutdown_requested;
	}

	void shutdown() noexcept override {
		std::cout << _prefix << " shutdown requested" << std::endl;

		std::unique_lock<std::mutex> lock(_lock);
		if (_shutdown_requested) return; //nothing to do.
		_shutdown_requested = true;
		lock.unlock();

		_condition.notify_one();
		_thread.join();
	}
};

int main() {
	concurrencpp::runtime runtime;
	auto logging_ex = runtime.make_executor<logging_executor>("Session #1234");

	for (size_t i = 0; i < 10; i++) {
		logging_ex->post([] {
			std::cout << "hello world" << std::endl;
		});
	}

	std::getchar();
	return 0;
}

In this example, we created an executor which logs actions like enqueuing a task or executing it. We implement the executor interface, and we request the runtime to create and store an instance of it by calling runtime::make_executor. The rest of the application behaves exactly the same as if we were to use non user-defined executors.

Supported platforms and tools

  • Operating systems: Linux, macOS, Windows (Windows 10 and above)
  • Compilers: MSVC (Visual Studio 2019 version 16.8.2 and above), Clang (Clang-11 and above)
Owner
David Haim
A software engineer.
David Haim
Comments
  • semaphore file not found i macos

    semaphore file not found i macos

    Hi, I checkout to branch 'develop' and tried to build this project. Compiler shows that semaphore.h can not be found.

    Mojave 10.14 Clang-10

    concurrencpp/executors/thread_pool_executor.h:9:10: fatal error: 'semaphore' file not found
    [build] #include <semaphore>
    
  • Use Modern CMake

    Use Modern CMake

    I started the work for this library to support clients:

    • Pulling in the project in their source tree
    • Pulling in the project from the system using find_package after installing

    Please take a look at the work done so far and comment on my assumptions.

  • Workaround conversion issue of span/vector in Clang 14, add Clang 14 to CI

    Workaround conversion issue of span/vector in Clang 14, add Clang 14 to CI

    This suggests a workaround for #73 which allows the library and tests to compile using Clang 14. Instead of adding the two overloads to bulk_post and bulk_submit, an alternative workaround could be to change various places in the tests where vectors are passed to these functions. But it would have required many more changes so I went with this other approach.

    Also adds Clang 14 on Ubuntu 22.04 to CI.

  • Specific Test Failing

    Specific Test Failing

    Hello! Your library looks very impressive! All the tests are passing consistently on my main system (amd-ubuntu-x86_64.txt) but I've seen that my Pi has issues completing them sometimes.

    System

    Raspberry Pi 4
    Linux nebula 5.4.0-1015-raspi #15-Ubuntu SMP Fri Jul 10 05:34:24 UTC 2020 aarch64 aarch64 aarch64 GNU/Linux
    
    COLLECT_GCC=cc
    COLLECT_LTO_WRAPPER=/usr/lib/gcc/aarch64-linux-gnu/9/lto-wrapper
    Target: aarch64-linux-gnu
    Configured with: ../src/configure -v --with-pkgversion='Ubuntu 9.3.0-10ubuntu2' --with-bugurl=file:///usr/share/doc/gcc-9/README.Bugs --enable-languages=c,ada,c++,go,d,fortran,objc,obj-c++,gm2 --prefix=/usr --with-gcc-major-version-only --program-suffix=-9 --program-prefix=aarch64-linux-gnu- --enable-shared --enable-linker-build-id --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --libdir=/usr/lib --enable-nls --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --with-default-libstdcxx-abi=new --enable-gnu-unique-object --disable-libquadmath --disable-libquadmath-support --enable-plugin --enable-default-pie --with-system-zlib --with-target-system-zlib=auto --enable-multiarch --enable-fix-cortex-a53-843419 --disable-werror --enable-checking=release --build=aarch64-linux-gnu --host=aarch64-linux-gnu --target=aarch64-linux-gnu
    Thread model: posix
    gcc version 9.3.0 (Ubuntu 9.3.0-10ubuntu2)
    
    COLLECT_GCC=c++
    COLLECT_LTO_WRAPPER=/usr/lib/gcc/aarch64-linux-gnu/9/lto-wrapper
    Target: aarch64-linux-gnu
    Configured with: ../src/configure -v --with-pkgversion='Ubuntu 9.3.0-10ubuntu2' --with-bugurl=file:///usr/share/doc/gcc-9/README.Bugs --enable-languages=c,ada,c++,go,d,fortran,objc,obj-c++,gm2 --prefix=/usr --with-gcc-major-version-only --program-suffix=-9 --program-prefix=aarch64-linux-gnu- --enable-shared --enable-linker-build-id --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --libdir=/usr/lib --enable-nls --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --with-default-libstdcxx-abi=new --enable-gnu-unique-object --disable-libquadmath --disable-libquadmath-support --enable-plugin --enable-default-pie --with-system-zlib --with-target-system-zlib=auto --enable-multiarch --enable-fix-cortex-a53-843419 --disable-werror --enable-checking=release --build=aarch64-linux-gnu --host=aarch64-linux-gnu --target=aarch64-linux-gnu
    Thread model: posix
    gcc version 9.3.0 (Ubuntu 9.3.0-10ubuntu2)
    

    Issue

    assert_true fails on this function but only sometimes

    bool scheduled_async() const noexcept {
    	return std::this_thread::get_id() == m_execution_thread.get_id();
    }
    

    I modified this slightly when running the tests:

    bool scheduled_async() const noexcept {
    
    	auto this_thread_id = std::this_thread::get_id();
    	auto execution_thread_id = m_execution_thread.get_id();
    
    	std::cout << "$$ std::this_thread::get_id() == " << this_thread_id << std::endl;
    	std::cout << "$$ m_execution_thread::get_id() == " << execution_thread_id << std::endl;
    
    	return this_thread_id == execution_thread_id;
    }
    

    I saw that m_execution_thread.get_id() just returns 0 sometimes. I don't really see a pattern because the tests will run fine about 65% of the time but any one of these are randomly failing:

    test_result_resolve_via_impl<int>();
    test_result_resolve_via_impl<std::string>();
    test_result_resolve_via_impl<void>();
    test_result_resolve_via_impl<int&>();
    test_result_resolve_via_impl<std::string&>();
    

    Example 1

    ...
    test_result_resolve_via_impl<std::string&>() // Type is std::string&
    	test_result_resolve_via_impl()
    		test_result_resolve_via_ready_val_force_resuchuling()
    			assert_true(executor->scheduled_async())
    				...
    

    GDB: (break abort)

    $$ std::this_thread::get_id() == 281474830115216
    $$ m_execution_thread::get_id() == 0
    assertion faild. expected: [true] actual: [false].[Switching to Thread 0xfffff7432190 (LWP 72699)]
    Thread 36 "tests" hit Breakpoint 4, __GI_abort () at abort.c:49
    49	abort.c: No such file or directory.
    (gdb) backtrace
    #0  __GI_abort () at abort.c:49
    #1  0x0000000000405454 in concurrencpp::tests::assert_true(bool) ()
    Backtrace stopped: previous frame identical to this frame (corrupt stack?)
    

    Example 2

    ...
    test_result_resolve_via_impl<int>() // Type is int
    	test_result_resolve_via_impl()
    		test_result_resolve_via_ready_val_force_resuchuling()
    			assert_true(executor->scheduled_async())
    				...
    

    GDB: (break abort)

    $$ std::this_thread::get_id() == 281474838507920
    $$ m_execution_thread::get_id() == 0
    assertion faild. expected: [true] actual: [false].[Switching to Thread 0xfffff7c33190 (LWP 73254)]
    Thread 36 "tests" hit Breakpoint 2, __GI_abort () at abort.c:49
    49	abort.c: No such file or directory.
    (gdb) backtrace
    #0  __GI_abort () at abort.c:49
    #1  0x0000000000405454 in concurrencpp::tests::assert_true(bool) ()
    Backtrace stopped: previous frame identical to this frame (corrupt stack?)
    
  • Should pthreads be a public dependency?

    Should pthreads be a public dependency?

    In https://github.com/David-Haim/concurrencpp/blob/580f430caf1369521ee1422c15c4c8a89823972b/cmake/coroutineOptions.cmake, pthreads is a PRIVATE dependency:

    target_link_libraries(${TARGET} PRIVATE Threads::Threads)
    

    Not only the main library but also the unit tests call the target_coroutine_options CMake function, thus the tests link to pthreads. However, when I try to build this simple example on Linux, I get a linker error missing the pthreads library if I don't explicitly link to pthreads:

    #include "concurrencpp/concurrencpp.h"
    #include <iostream>
    
    int main() {
        concurrencpp::runtime runtime;
        auto result = runtime.thread_executor()->submit([] {
            std::cout << "hello world" << std::endl;
        });
    
        result.get();
        return 0;
    }
    
    find_package(concurrencpp REQUIRED CONFIG)
    
    add_executable(${PROJECT_NAME} test_package.cpp)
    target_link_libraries(${PROJECT_NAME} concurrencpp::concurrencpp)
    target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_20)
    

    Should Threads::Threads be made a PUBLIC dependency so that any consumer links to pthreads automatically? Or is it intentional that users must link to pthreads manually? Is there any reasonable way to use the library without pthreads?

  • constexpr function must be initialized

    constexpr function must be initialized

    Disclaimer: I am very new to C++, so I'm probably doing something stupid.

    When I try to build concurrencpp with: mkdir build cd build cmake build .. cmake --build .

    I get an error:

    concurrencpp/include/concurrencpp/task.h:119:20: error: variables defined in a constexpr function must be initialized
    void(*move_fn)(viud* src, void* dst) noexcept;
    
  • rework result_promise to only allocate on get_result

    rework result_promise to only allocate on get_result

    This was done to resolve leaked state that was caused when get_result isn't called. In practice, the set_* methods were usable until after get_result was called since they reset the promise's contained state (making it impossible to get the corresponding result).

    I found this leak using the leak sanitizer with the result_promise_tests.

  • Cannot compile

    Cannot compile

    Hi,

    Linux; Debian Bullseye;

    I tried to compile with cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang++-11 -S . -B build/lib The first step works but cmake --build build/lib

    fails with so many errors. For ex: error: ‘coroutine_handle’ does not name a type; did you mean ‘coroutine_handle_functor’ error: expected ‘)’ before ‘<’ token coroutine_handle_functor(coroutine_handle<void> coro_handle) noexcept : m_coro_handle(coro_handle) {} error: class ‘concurrencpp::details::coroutine_handle_functor’ does not have any field named ‘m_coro_handle’

    Same with GCC 11. I changed the filecoroutineOptions to

    function(target_coroutine_options TARGET)
      if(MSVC)
        target_compile_options(${TARGET} PUBLIC /std:c++latest /permissive-)
        return()
      endif()
    
      find_package(Threads REQUIRED)
      target_link_libraries(${TARGET} PRIVATE Threads::Threads)
    
      if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
        target_compile_options(${TARGET} PUBLIC -std=libc++-fcoroutines-ts)
        target_link_options(${TARGET} PUBLIC -std=libc++)
        set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO)
        
    elseif(CMAKE_CXX_COMPILER_ID MATCHES "GNU")
        target_compile_options(${TARGET} PUBLIC -std=gnu++20 -fcoroutines)
        target_link_options(${TARGET} PUBLIC -std=gnu++20)
        set_target_properties(${TARGET} PROPERTIES CXX_EXTENSIONS NO)
      else()
        message(FATAL_ERROR "Compiler not supported: ${CMAKE_CXX_COMPILER_ID}")
      endif()
    endfunction()
    

    That library really seems good but, as of now, cannot compile it. Before looking further I would like your opinion on this. I don't get why the code is not working given recent compilers and gnu++20 or clang c11.

    Thanks :)

  • Compile error in use

    Compile error in use

    Version: newest OS: Win10 IDE:VS2019 newest

    when i compile my project with this lib

    concurrencpp\results\when_result.h(135,76): warning C4003: 类函数宏的调用“max”参数不足 concurrencpp\results\when_result.h(135,76): error C2589: “(”:“::”右边的非法标记

    code is

    auto completed_result_index = std::numeric_limits<size_t>::max();

  • shared_result?

    shared_result?

    Are there plans to include a "shared_result" type that is not move-only and allows multiple threads to co_await on the same result? This type would be analogous to std::shared_future.

    My use case is having a UI thread continuously poll an in-progress result until it is available, then using its value once it becomes available. Having to manually store the value once it becomes available significantly complicates the code on my side.

  • thread_pool_executor deadlock

    thread_pool_executor deadlock

    Fascinated with your toolkit and while experimenting may have discovered a possible executor deadlock bug. It occurs after submitting multiple result sets and awaiting completion via when_all. The same issue had also been observed using when_any as well.

    I've managed to isolate a relatively small test case which I'll attach, that uses when_all.

    In my test case I call submit() in a 2 level nested fashion, having passed the executor in as an argument.

    At the second level the code may be conditionally compiled to substitute using std_thread instead of submit(). When using submit() a deadlock is observed. When using std::thread no deadlock is observed.

    Am using MSVC 19.29.30136 for x64 (as reported from command line using cl --version).

    Dev tools environment is:

    WIndows 10 hosted VM configured for 2 CPU's.

    Microsoft Visual Studio Enterprise 2019 Version 16.11.5 VisualStudio.16.Release/16.11.5+31729.503

    Installed Version: Enterprise

    Visual C++ 2019 00435-60000-00000-AA788 Microsoft Visual C++ 2019

    Here is a test case below. Looks like the inserting into the forum here screwed up the source code a bit, particularly greater-than/less-than signs. But after reading the code you should be able to get the idea. Hope this helps.

    #include "iostream" #include "chrono" #include "string" #include "ctime"

    #include "concurrencpp/concurrencpp.h"

    /////////////////////////////////////////////////////////////////////////////////////// int dummy_task2(std::shared_ptrconcurrencpp::thread_pool_executor background_executor, int instance) { std::cerr << "+dummy_task2(): instance=" << instance << ", tid=" << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds((instance+1) * 250)); std::cerr << "-dummy_task2(): instance=" << instance << ", tid=" << std::this_thread::get_id() << std::endl; return 0; }

    /////////////////////////////////////////////////////////////////////////////////////// concurrencpp::result level2_dispatch(std::shared_ptrconcurrencpp::thread_pool_executor background_executor) { std::vector<concurrencpp::result> results;

    std::cerr << "+level2_dispatch(), tid=" << std::this_thread::get_id() << std::endl;
    
    for (int i = 0; i < 1; i++)
    {
        // start a thread for each...
    

    //#define USE_STD_THREAD #if defined(USE_STD_THREAD) concurrencpp::result_promise promise; auto res = promise.get_result(); std::thread my_3_party_executor(promise = std::move(promise), background_executor, i mutable { promise.set_result(dummy_task2(background_executor, i)); }); my_3_party_executor.detach(); results.emplace_back(std::move(res)); #else results.emplace_back(background_executor->submit(dummy_task2, background_executor, i)); #endif }

    auto all_consumed = concurrencpp::when_all(std::begin(results), std::end(results));
    all_consumed.get();
    
    std::cerr << "-level2_dispatch(), tid=" << std::this_thread::get_id() << std::endl;
    co_return 0;
    

    }

    /////////////////////////////////////////////////////////////////////////////////////// int dummy_task1(std::shared_ptrconcurrencpp::thread_pool_executor background_executor, int instance) { std::cerr << "+dummy_task1(): instance=" << instance << ", tid=" << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds((instance+1) * 250)); try { level2_dispatch(background_executor).get(); } catch (const std::exception& e) { std::cerr << "@main(): EXCEPTION: " << e.what() << std::endl; return EXIT_FAILURE; } std::cerr << "-dummy_task1(): instance=" << instance << ", tid=" << std::this_thread::get_id() << std::endl; return 0; }

    /////////////////////////////////////////////////////////////////////////////////////// concurrencpp::result level1_dispatch(std::shared_ptrconcurrencpp::thread_pool_executor background_executor) { std::vector<concurrencpp::result> results;

    std::cerr << "+level1_dispatch(), tid=" << std::this_thread::get_id() << std::endl;
    for (int i = 0; i < 1; i++)
    {
        // start a thread for each...
        results.emplace_back(background_executor->submit(dummy_task1, background_executor, i));
    }
    
    auto all_consumed = concurrencpp::when_all(std::begin(results), std::end(results));
    all_consumed.get();
    
    std::cerr << "-level1_dispatch(), tid=" << std::this_thread::get_id() << std::endl;
    co_return 0;
    

    }

    /////////////////////////////////////////////////////////////////////////////////////// int main(int argc, char *argv[]) { concurrencpp::runtime runtime;

    std::cerr << "+main(): tid=" << std::this_thread::get_id() << std::endl;
    try
    {
        level1_dispatch(runtime.background_executor()).get();
    }
    catch (const std::exception& e)
    {
        std::cerr << "@main(): EXCEPTION: " << e.what() << std::endl;
        return EXIT_FAILURE;
    }
    std::cerr << "-main(): tid=" << std::this_thread::get_id() << std::endl;
    

    }

  • Use standard stdlib by default, make libc++ optional

    Use standard stdlib by default, make libc++ optional

    Closes #86.

    Since there is no inbuilt support in CMake to configure the stdlib, the suggested approach is setting the appropriate compile options in a toolchain file.

  • Support clang with other standard libraries

    Support clang with other standard libraries

    I've successfully built with clang and have ran the test suite (including TSAN where applicable) on the following platforms:

    • Ubuntu 22.04, libstdc++ 11
    • Ubuntu 22.04, libstdc++ 12
    • Windows, MSVC 14.33.31629*

    It seems reasonable to support an OS's defacto stl implementation when possible.

    • There is one failure on Windows when using clang with Microsoft's STL arising from the handling of exceptions. Seems like clang doesn't handle things correctly sometimes when exceptions are caught by value. The test suite passes when this line is changed to catch by const reference rather than value.
  • Cannot link to shared builds of the library on Windows

    Cannot link to shared builds of the library on Windows

    concurrencpp currently sets

    set(CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS TRUE)
    

    in order to work around the need on Windows to explicitly declare DLL exports. However, when trying to link to the library built as a shared target, I get the following linker errors:

    coroutine_promise_tests.obj : error LNK2019: Verweis auf nicht aufgelöstes externes Symbol ""public: static struct conc
    urrencpp::details::coroutine_per_thread_data concurrencpp::details::coroutine_per_thread_data::s_tl_per_thread_data" (?
    [email protected][email protected]@[email protected]@[email protected])" in Funktion ""private: static class conc
    urrencpp::result<void> __cdecl concurrencpp::executor::submit_bridge$_InitCoro$2<void,class concurrencpp::worker_thread
    _executor,class `struct concurrencpp::null_result __cdecl concurrencpp::tests::initialy_rescheduled_null_result_coro(st
    ruct concurrencpp::executor_tag,class std::shared_ptr<class concurrencpp::worker_thread_executor>,class std::thread::id
    ,class std::shared_ptr<class concurrencpp::worker_thread_executor>,class std::shared_ptr<class concurrencpp::worker_thr
    ead_executor>,class std::shared_ptr<class concurrencpp::worker_thread_executor>,class concurrencpp::tests::testing_stub
    ,bool)'::`2'::<lambda_1> >(struct concurrencpp::executor_tag,class concurrencpp::worker_thread_executor &,class `struct
     concurrencpp::null_result __cdecl concurrencpp::tests::initialy_rescheduled_null_result_coro(struct concurrencpp::exec
    utor_tag,class std::shared_ptr<class concurrencpp::worker_thread_executor>,class std::thread::id,class std::shared_ptr<
    class concurrencpp::worker_thread_executor>,class std::shared_ptr<class concurrencpp::worker_thread_executor>,class std
    ::shared_ptr<class concurrencpp::worker_thread_executor>,class concurrencpp::tests::testing_stub,bool)'::`2'::<lambda_1
    >)" ([email protected][email protected]@@V<lambda_1>@?1??initialy_rescheduled_null_resu
    [email protected]@[email protected][email protected]@[email protected]@[email protected][email protected]@@@[email protected]@[email protected]
    @[email protected][email protected]@[email protected]@[email protected]@[email protected]@[email protected]@[email protected][email protected]@[email protected]@V
    <lambda_1>@[email protected]@[email protected][email protected]@[email protected][email protected]
    [email protected]@@[email protected]@[email protected]@[email protected]@[email protected]@[email protected]@@Z)".
    

    The error is about this variable in promises.h:

    static thread_local coroutine_per_thread_data s_tl_per_thread_data;
    

    Note the following hint on WINDOWS_EXPORT_ALL_SYMBOLS:

    For global data symbols, __declspec(dllimport) must still be used when compiling against the code in the .dll.

    I tried adding __declspec(dllimport) to s_tl_per_thread_data but got another error indicating that symbols with thread-local storage cannot be part of a DLL interface.

    If shared builds cannot be supported on Windows, I suggest to remove CMAKE_WINDOWS_EXPORT_ALL_SYMBOLS and put a note into the README or CMakeLists.txt.

  • GCC 11 and GCC 12 support

    GCC 11 and GCC 12 support

    This PR provides the following improvements:

    • adds fixes and tweaks to make the library compile with GCC 11 and GCC 12
    • builds and runs tests using GCC 11 and GCC 12 in GitHub CI
    • adds support for ThreadSanitizer with GCC
Cpp-concurrency - cpp implementation of golang style concurrency

cpp-concurrency C++ implementation of golang style concurrency Usage Use existing single header concurrency.hpp or run script to merge multiple header

Aug 11, 2022
Open source PHP extension for Async IO, Coroutines and Fibers

Swoole is an event-driven asynchronous & coroutine-based concurrency networking communication engine with high performance written in C++ for PHP. Ope

Sep 28, 2022
C++14 asynchronous allocation aware futures (supporting then, exception handling, coroutines and connections)
C++14 asynchronous allocation aware futures (supporting then, exception handling, coroutines and connections)

Continuable is a C++14 library that provides full support for: lazy async continuation chaining based on callbacks (then) and expression templates, ca

Sep 21, 2022
C++20 coroutines-based cooperative multitasking library

?? Coop Coop is a C++20 coroutines-based library to support cooperative multitasking in the context of a multithreaded application. The syntax will be

Aug 21, 2022
Discrete-event simulation in C++20 using coroutines

SimCpp20 SimCpp20 is a discrete-event simulation framework for C++20. It is similar to SimPy and aims to be easy to set up and use. Processes are defi

Sep 24, 2022
Header-Only C++20 Coroutines library

CPP20Coroutines Header-Only C++20 Coroutines library This repository aims to demonstrate the capabilities of C++20 coroutines. generator Generates val

Aug 15, 2022
Cppcoro - A library of C++ coroutine abstractions for the coroutines TS

CppCoro - A coroutine library for C++ The 'cppcoro' library provides a large set of general-purpose primitives for making use of the coroutines TS pro

Sep 26, 2022
Coro - Single-header library facilities for C++2a Coroutines

coro This is a collection of single-header library facilities for C++2a Coroutines. coro/include/ co_future.h Provides co_future<T>, which is like std

Sep 28, 2022
Bistro: A fast, flexible toolkit for scheduling and running distributed tasks

Bistro is a flexible distributed scheduler, a high-performance framework supporting multiple paradigms while retaining ease of configuration, management, and monitoring.

Sep 21, 2022
Smart queue that executes tasks in threadpool-like manner

execq execq is kind of task-based approach of processing data using threadpool idea with extended features. It supports different task sources and mai

Aug 11, 2022
Partr - Parallel Tasks Runtime

Parallel Tasks Runtime A parallel task execution runtime that uses parallel depth-first (PDF) scheduling [1]. [1] Shimin Chen, Phillip B. Gibbons, Mic

Jul 17, 2022
Sep 30, 2022
The C++ Standard Library for Parallelism and Concurrency

Documentation: latest, development (master) HPX HPX is a C++ Standard Library for Concurrency and Parallelism. It implements all of the corresponding

Oct 3, 2022
HPX is a C++ Standard Library for Concurrency and Parallelism

HPX is a C++ Standard Library for Concurrency and Parallelism. It implements all of the corresponding facilities as defined by the C++ Standard. Additionally, in HPX we implement functionalities proposed as part of the ongoing C++ standardization process. We also extend the C++ Standard APIs to the distributed case.

Oct 3, 2022
Async++ concurrency framework for C++11

Async++ Async++ is a lightweight concurrency framework for C++11. The concept was inspired by the Microsoft PPL library and the N3428 C++ standard pro

Sep 23, 2022
The libdispatch Project, (a.k.a. Grand Central Dispatch), for concurrency on multicore hardware

Grand Central Dispatch Grand Central Dispatch (GCD or libdispatch) provides comprehensive support for concurrent code execution on multicore hardware.

Sep 23, 2022
Go-style concurrency in C

LIBMILL Libmill is a library that introduces Go-style concurrency to C. Documentation For the documentation check the project website: http://libmill.

Sep 30, 2022
A header-only C++ library for task concurrency
A header-only C++ library for task concurrency

transwarp Doxygen documentation transwarp is a header-only C++ library for task concurrency. It allows you to easily create a graph of tasks where eve

Sep 25, 2022
Complementary Concurrency Programs for course "Linux Kernel Internals"

Complementary Programs for course "Linux Kernel Internals" Project Listing tpool: A lightweight thread pool. tinync: A tiny nc implementation using co

Sep 23, 2022