C++11 多线程

C++11 多线程

C++11 多线程相关的头文件 :

https://en.cppreference.com/w/cpp/thread

C++11 新标准中引入了四个头文件来支持多线程编程

<atomic>,<mutex>,<condition_variable>,<future>

  • <atomic> 该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。
  • <mutex> 该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。
  • <condition_variable>:该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。
  • <future> 该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。

第一个实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// MacOS  Mojave 10.14.5
// c++ cppthread.cpp -o main -std=c++11

#include <iostream>
#include <thread>
void threadfoo()
{
std::cout << "threadfoo" << "\n";
}
int main()
{
// default construcror , t1 is not a thread ,t.joinable()==false
std::thread t1;
std::cout << std::boolalpha << t1.joinable() << "\n";

// joinable == true
std::thread t2(threadfoo);
std::cout << std::boolalpha << t2.joinable() << "\n";

t2.join();
// joinable == false
std::cout << std::boolalpha << t2.joinable() << "\n";

return 0;
}

atomic 头文件相关:

https://en.cppreference.com/w/cpp/atomic/atomic

C++11 新增头文件 <atomic>

atomic,atomic_flag,memory_order

atomic

简介:

原子类型对象的主要特点就是从不同线程访问不会导致数据竞争(data race)。因此从不同线程访问某个原子对象是良性 (well-defined) 行为,而通常对于非原子类型而言,并发访问某个对象(如果不做任何同步操作)会导致未定义 (undifined) 行为发生。

1
2
3
4
5
Each instantiation and full specialization of the `std::atomic` template defines an atomic type. If one thread writes to an atomic object while another thread reads from it, the behavior is well-defined (see [memory model](https://en.cppreference.com/w/cpp/language/memory_model) for details on data races)

In addition, accesses to atomic objects may establish inter-thread synchronization and order non-atomic memory accesses as specified by [std::memory_order](https://en.cppreference.com/w/cpp/atomic/memory_order).

std::atomic is neither copyable nor movable.
1
2
3
std :: atomic模板的每个实例化和完全特化都定义了一个原子类型。如果一个线程写入原子对象而另一个线程从中读取,则行为是明确定义的(有关数据争用的详细信息,请参阅内存模型)
此外,对原子对象的访问可以建立线程间同步并按 std::memory_order 的指定顺序进行非原子内存访问。
std::atomic既不可复制也不可移动。
Defined in header <atomic>
template< class T > struct atomic; (1) (since C++11)
template< class T > struct atomic<T*>; (2) (since C++11)
Defined in header <memory>

第一种形式的模版类型必须T是 TriviallyCopyable , 类型必须满足以下条件:

The program is ill-formed if any of following values is false:

出现以下错误:

1
error: _Atomic cannot be applied to type '**' which is not trivially copyable
Specializations :序列化

​ Primary template : 使用第一种形式的模版

​ Partial specializations : 使用第二种形式的模版

​ Specializations of std::atomic for integral types : 类似 std::atomic_bool typedef 类型 header头文件

​ Specializations of std::atomic for floating point types

Flag : atomic_flag

1
std::atomic_flag is an atomic boolean type. Unlike all specializations of std::atomic, it is guaranteed to be lock-free. Unlike std::atomic<bool>, std::atomic_flag does not provide load or store operations.
1
2
std::atomic_flag 是一种原子布尔类型。 与std::atomic的所有特化不同,它保证是无锁的。 
与std::atomic <bool>不同,std::atomic_flag不提供加载或存储操作。

使用语句std :: atomic_flag v = ATOMIC_FLAG_INIT;

定义可用于将std :: atomic_flag初始化为clear(false)状态的表达式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// A spinlock mutex can be implemented in userspace using an atomic_flag
#include <iostream>
#include <thread>
#include <atomic>

#include <vector>

// init atomic_flag to false-state
std::atomic_flag lock = ATOMIC_FLAG_INIT;

void func( int n )
{
for (int i = 0; i < 3; ++i)
{
// set var lock true-state and return last-value
while(lock.test_and_set(std::memory_order_acquire));

std::cout << "Output from thread " << i << " :" <<std::this_thread::get_id()<< '\n';
// set/clear var lock false-state ,
// warnning : param cannot be
// std::memory_order_consume,
// std::memory_order_acquire,
// std::memory_order_acq_rel
lock.clear(std::memory_order_release);

}
}

int main(int argc, char const *argv[])
{
std::vector<std::thread> v;
for (int i = 0; i < 3; ++i)
{

v.emplace_back(func , i);
}
// for(auto th: v)
// err : thread(const thread&) = delete;
for(auto & th: v)
{
th.join();
}

return 0;
}

memory_order

对于atomic对象操作有6种memory ordering 选项,

1
2
3
4
5
6
7
8
typedef enum memory_order {
memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst
} memory_order;

默认情况下的为 memory_order_seq_cst。尽管有 6种选项,但是它们代表三种模型:

1
2
3
4
5
6
7
sequentially-consistent ordering(memory_order_seq_cst)
acquire-release ordering(
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel)
relaxed ordering (memory_order_relaxed)。

另外需要注意的是对于不同的memory ordering运行 在不同的cpu架构的机器上运行的代价是不一样的,

比如对于对同步指令的需求 sequentially-consistent ordering模型大于acquire-release ordering或者relaxed ordering ,acquire-release ordering大于relaxed ordering ;

如果是运行在多处理器的操作系统上面,这些额外的同步指令开销可能会消耗重要的 cpu时间,从而造成总体系统性能的下降。对于x86或 x86-64架构的处理器在使用 acquire-release模型的时候不需要任何额外的指令开销,甚至是对于比较严格的sequentially consisten ordering 也不需要特殊处理,并且花费的代价也很少。

relaxed ordering :

​ 在这种模型下,std::atomicload()store()都要带上memory_order_relaxed参数。Relaxed ordering 仅仅保证load()store()是原子操作,除此之外,不提供任何跨线程的同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

// c++ cppmorder.cpp -o main -std=c++11

#include <iostream>
#include <thread>
#include <atomic>


std::atomic_int x(1);
std::atomic_int y(2);

void foo1()
{
int y1 = y.load(std::memory_order_relaxed); // A
x.store(y1,std::memory_order_relaxed); // B
std::cout << "foo1 :" << y1 << '\n';
}

void foo2()
{
int x1 = x.load(std::memory_order_relaxed); // C

y.store(99 , std::memory_order_relaxed); // D
std::cout << "foo2 :" << y << '\n';
}

int main(int argc, char const *argv[])
{
std::thread t1(foo1);

std::thread t2(foo2);

t1.join();
t2.join();

return 0;
}

执行完上面的程序,可能出现r1 == r2 == 99。理解这一点并不难,因为编译器允许调整 C 和 D 的执行顺序。如果程序的执行顺序是 D -> A -> B -> C,那么就会出现r1 == r2 == 99

应用场景: 如果某个操作只要求是原子操作,除此之外,不需要其它同步的保障,就可以使用 Relaxed ordering。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 程序计数器是一种典型的应用场景:
#include <iostream>
#include <thread>
#include <atomic>

#include <vector>

std::atomic_int cnt(0);

void func(int n)
{
for (int i = 0; i < 10; ++i)
{
cnt.fetch_add(1 , std::memory_order_relaxed);

std::cout << "cnt: " << cnt << " #thread:" << std::this_thread::get_id() << '\n';
}
}

int main(int argc, char const *argv[])
{
std::vector<std::thread> v;
for (int i = 0; i < 2; ++i)
{
v.emplace_back(func , i);
}


for(auto & th: v)
{
th.join();
}

return 0;
}

acquire-release ordering

在这种模型下,store()使用memory_order_release,而load()使用memory_order_acquire

这种模型有两种效果,

第一种是可以限制 CPU 指令的重排:在store()之前的所有读写操作,不允许被移动到这个store()的后面。在load()之后的所有读写操作,不允许被移动到这个load()的前面。  

除此之外,还有另一种效果:假设 Thread-1 store()的那个值,成功被 Thread-2 load()到了,那么 Thread-1 在store()之前对内存的所有写入操作,此时对 Thread-2 来说,都是可见的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>  
#include <thread>
#include <atomic>

std::atomic_bool ready{ false };

int data = 0;

void producer()
{
data = 100; // A
ready.store(true, std::memory_order_release); // B
std::cout << std::boolalpha <<"producer: " << ready << '\n';
}
void consumer()
{
std::cout << std::boolalpha <<"consumer: " << ready << '\n';
std::cout << "data:" << data << '\n';
while (!ready.load(std::memory_order_acquire)) // C
;
assert(data == 100); // never failed // D
}
int main()
{
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
/* 过程:
1、首先 A 不允许被移动到 B 的后面。
2、同样 D 也不允许被移动到 C 的前面。
3、当 C 从 while 循环中退出了,说明 C 读取到了 B store()的那个值,
此时,Thread-2 保证能够看见 Thread-1 执行 B 之前的所有写入操作(也即是 A)。
*/

Reference:

  1. think cell talk memory model

  2. Acquire and Release Semantics