cpp11mutex

C++11 多线程- Mutual exclusion

1
2
3
Mutual exclusion algorithms prevent multiple threads from simultaneously accessing shared resources. This prevents data races and provides support for synchronization between threads.

互斥算法可防止多个线程同时访问共享资源。这可以防止数据争用并为线程之间的同步提供支持。

头文件 <mutex>

Mutex 系列:

mutex

​ 1、简介: https://en.cppreference.com/w/cpp/thread/mutex

1
2
3
4
5
6
7
8
9
10
11
12
13

互斥类是一个同步原语,可用于保护共享数据不被多个线程同时访问。

mutex 提供独立的、非递归所有权语义:
1、调用线程在成功调用lock或try_lock之前拥有一个互斥锁,直到它调用unlock。
2、当线程拥有互斥锁时,如果所有其他线程试图声明互斥锁的所有权,则所有其他线程将阻塞(对于锁定的调用)或接收错误的返回值(对于try_lock)。
3、在调用lock或try_lock之前,调用线程不得拥有互斥锁。

如果在仍由任何线程拥有的情况下销毁互斥锁,或者在拥有互斥锁时线程终止,则程序的行为是未定义的。

互斥类满足Mutex和StandardLayoutType的所有要求。

std::mutex 既不可复制也不可移动。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <iostream>
#include <thread>
#include <mutex>

#include <map>

std::mutex g_mutex;
std::map<std::string , std::string> g_map;

void save_page(const std::string url)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
std::string result = "fake content";

// std::lock_guard<std::mutex> guard(g_mutex);
// g_map[url] = result;

g_mutex.lock();
g_map[url] = result;
g_mutex.unlock();

}

int main(int argc, char const *argv[])
{
std::thread t1(save_page, " http://foo");
std::thread t2(save_page, " http://bar");

t1.join();
t2.join();

for(const auto &pair : g_map)
{
std::cout << pair.first << "===>" << pair.second << '\n';
}

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Example:  try_lock 
#include <iostream>
#include <thread>
#include <mutex>

volatile int counter(0); // non-atomic counter

std::mutex mtx; // locks access to counter

void foo() {
for (int i=0; i<10; ++i) {
if (mtx.try_lock()) { // only increase if currently not locked:
++counter;
std::cout << std::boolalpha ;
std::cout << "A #counter: "<< counter \
<<" #thread id :(" << std::this_thread::get_id() << ")\n";
mtx.unlock();
}
else
{
// std::cout << "B #counter: "<< counter \
<<" #thread id :(" << std::this_thread::get_id() << ")\n";
}
}
}
int main (int argc, const char* argv[]) {
static const int n = 10;
std::thread threads[n];
for (int i=0; i<n; ++i)
threads[i] = std::thread(foo);

for (auto& th : threads)
th.join();
std::cout << counter << " successful increases of the counter.\n";

return 0;
}

timed_mutex

recursive_mutex

recursive_timed_mutex

Generic mutex management: 通用互斥 管理器(包装器)

Generic locking algorithms: 通用锁定 算法

Call once:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

#include <iostream>

#include <thread>
#include <mutex>

std::once_flag flag1, flag2;

void simple_do_once()
{
std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}

void may_throw_function(bool do_throw)
{
if (do_throw) {
std::cout << "throw: call_once will retry\n"; // this may appear more than once
throw std::exception();
}
std::cout << "Didn't throw, call_once will not attempt again\n"; // guaranteed once
}

void do_once(bool do_throw)
{
try {
std::call_once(flag2, may_throw_function, do_throw);
}
catch ( const std::exception & e) {
std::cout << "exception:" << e.what() << "\n";
}
}

int main()
{
// std::thread st1(simple_do_once);
// std::thread st2(simple_do_once);
// std::thread st3(simple_do_once);
// std::thread st4(simple_do_once);
// st1.join();
// st2.join();
// st3.join();
// st4.join();

std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();
}
/*
即使从多个线程同时调用,也只执行一次Callable对象f。
详细说明:
1、如果在调用call_once时,flag指示f已经被调用,则call_once立即返回
(这样对call_once的调用称为被动)。
2、call_once使用参数 std::forward <Args>(args)...调用std::forward <Callable>(f)(就好像通过std::invoke)。与std::thread构造函数或std::async不同,不会移动或复制参数,因为它们不需要传输到另一个执行线程。 (这种对call_once的调用称为主动)。
2.1、如果该调用抛出异常,它将传播给call_once的调用者,并且不会翻转该标志,以便尝试另一个调用(这样调用call_once称为异常)。
2.2、如果该调用正常返回(这种对call_once的调用称为返回),则翻转该标志,并且保证所有其他具有相同标志的call_once调用都是被动的。
*/

头文件 <shared_mutex>

​ 涉及 C++14, c++17 暂时不整理

头文件 <condition_variable>

Condition variables

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
The condition_variable class is a synchronization primitive that can be used to block a thread, or multiple threads at the same time, until another thread both modifies a shared variable (the condition), and notifies the condition_variable.

condition_variable 类是一个同步原语,可用于同时阻塞一个线程或多个线程,直到另一个线程同时修改共享变量(the condition),并通知 condition_variable。

The thread that intends to modify the variable has to
1、acquire a std::mutex (typically via std::lock_guard)
2、perform the modification while the lock is held
3、execute notify_one or notify_all on the std::condition_variable (the lock does not need to be held for notification)

线程计划更改 varibale 必须满足以下:
1、 获取 std::mutex (一般通过 std::lock_guard)
2、 保持锁定时执行修改
3、 在 std::condition_variable 上执行 notify_one 或者 notify_all ( 锁不需要保持通知)

Even if the shared variable is atomic, it must be modified under the mutex in order to correctly publish the modification to the waiting thread.
1
2
3
4
5
6
7
8
9
10
std::condition_variable 提供了两种 wait() 函数。
解释:
当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_* 唤醒了当前线程。在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。

在第二种情况下(即设置了 Predicate),只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。因此第二种情况类似以下代码:
/*
while (!pred()) {
wait(lock);
}
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>

std::mutex g_mutex;
std::condition_variable g_cv;
int cargo = 0;

void consume()
{
std::unique_lock<std::mutex> lk(g_mutex);
// g_cv.wait(lk);
// {
// std::cerr << "cargo :" << cargo << "\n" ;
// }

// parm is true unblock
g_cv.wait(lk , [](){ return cargo == 1;});
{
std::cerr << "#consume cargo :" << cargo << "\n" ;
}
}
void create()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(g_mutex);
++cargo;
std::cerr << std::this_thread::get_id() <<" notify...\n";
g_cv.notify_all();
}

std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::lock_guard<std::mutex> lk(g_mutex);
++cargo;
std::cerr << std::this_thread::get_id() <<" notify again...\n";
g_cv.notify_all();
}

}
int main(int argc, char const *argv[])
{
std::thread cth(consume);

std::thread cth2(create);
std::thread cth3(create);
std::thread cth4(create);
//
cth.join();
cth2.join();
cth3.join();
cth4.join();

std::this_thread::sleep_for(std::chrono::seconds(2));
std::cerr << std::this_thread::get_id() << ": " << cargo <<" \n";

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

#include <iostream>
#include <thread>
#include <mutex>

#include <condition_variable>

std::mutex g_mutex ;
std::condition_variable g_cv;

int g_num = 0;
bool dosome = false;

void consume()
{
// while(true)
// {
// std::unique_lock<std::mutex> ul(g_mutex);
// g_cv.wait(ul , []{return dosome; });
// std::cerr << "consume : " << g_num <<"\n";
// dosome = false;
// }
while(!dosome)
{
std::unique_lock<std::mutex> ul(g_mutex);
g_cv.wait(ul);
std::cerr << "consume : " << g_num <<"\n";
dosome = false;
}
}


void product( int n)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> lg(g_mutex);
g_num += n;
dosome = true;
std::cerr << "Notifying All...\n";
g_cv.notify_all();

}

int main(int argc, char const *argv[])
{
std::thread th(consume);
std::thread th2(product,1);
std::thread th3(product,2),th4(product,3);

th.join();
th2.join();
th3.join();
th4.join();


return 0;

}

生产者-消费者模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>


std::mutex g_mutex;
std::condition_variable g_produce, g_consume;

std::queue<int> g_queue;

static const int maxSize = 10;

void consumer()
{
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// RAII,程序运行到此block的外面(进入下一个while循环之前),资源(内存)自动释放
std::unique_lock<std::mutex> lck(g_mutex);
// wait(block) consumer until q.size() != 0 is true
g_consume.wait(lck, [] {return g_queue.size() != 0; });
std::cout << "#consumer " << std::this_thread::get_id() << ": ";
g_queue.pop();
std::cout << g_queue.size() << '\n';
// nodity(wake up) producer when q.size() != maxSize is true
g_produce.notify_all();
}
}

void producer()
{
while (true)
{

std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::unique_lock<std::mutex> lck(g_mutex);
// wait(block) producer until q.size() != maxSize is true
g_produce.wait(lck, [] {return g_queue.size() != maxSize; });

std::cout << "-> producer " << std::this_thread::get_id() << ": ";
g_queue.push(99);
std::cout << g_queue.size() << '\n';
// notify(wake up) consumer when q.size() != 0 is true
g_consume.notify_all();
}
}

int main(int argc, char const *argv[])
{
std::vector<std::thread> vcthread;
vcthread.emplace_back(producer);
vcthread.emplace_back(consumer);

vcthread.emplace_back(producer);
vcthread.emplace_back(consumer);
vcthread.emplace_back(producer);
vcthread.emplace_back(consumer);

for( auto & th : vcthread)
th.join();
return 0;
}

wait 使用中产生的 a spurious wakeup occurs, 用以下方式可以 避免

1
2
3
while (!pred()) {
wait(lock);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>


std::mutex g_mutex;
std::condition_variable g_produce, g_consume;

std::queue<int> g_queue;

static const int maxSize = 10;

void consumer()
{
//while(true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
// RAII,程序运行到此block的外面(进入下一个while循环之前),资源(内存)自动释放
std::unique_lock<std::mutex> lck(g_mutex);
// wait(block) consumer until q.size() != 0 is true

//g_consume.wait(lck, [] {return g_queue.size() !=0 ; });
while(g_queue.size() < 4)
{
g_consume.wait(lck);
}
std::cout << "#consumer " << std::this_thread::get_id() << ": ";
if(g_queue.size() > 0) g_queue.pop();
std::cout << g_queue.size() << '\n';
// nodity(wake up) producer when q.size() != maxSize is true
if(g_queue.size() < 10)
{
g_produce.notify_all();
}
}
}

void producer()
{
//while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::unique_lock<std::mutex> lck(g_mutex);
// wait(block) producer until q.size() != maxSize is true
//g_produce.wait(lck, [] {return g_queue.size() != maxSize; });
while(g_queue.size() > 10)
{
g_produce.wait(lck);
}
std::cout << "-> producer " << std::this_thread::get_id() << ": ";
g_queue.push(99);
std::cout << g_queue.size() << '\n';
// notify(wake up) consumer when q.size() != 0 is true
if(g_queue.size() >= 4)
{
g_consume.notify_all();
}
}
}

int main(int argc, char const *argv[])
{
std::vector<std::thread> vcthread;

for (int i = 0; i < 20; ++i)
{
vcthread.emplace_back(producer);
vcthread.emplace_back(consumer);
}

for( auto & th : vcthread)
th.join();
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

#include <iostream>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>


std::mutex g_mutex;
std::condition_variable g_produce, g_consume;

std::queue<int> g_queue;

static const int maxSize = 100;

static const int consumeSize = 5;
static const int productSize = 10;
void consumer()
{

while(g_queue.size() <= maxSize )
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

std::unique_lock<std::mutex> lck(g_mutex);

g_consume.wait(lck);
std::cout << "@@@-> consumer " << ": ";
std::cout << std::this_thread::get_id() << " : ";
if(g_queue.size() > 0) g_queue.pop();
std::cout << g_queue.size() << '\n';
// nodity(wake up) producer
g_produce.notify_all();

}
}

void producer()
{
while(g_queue.size() <= maxSize)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

std::unique_lock<std::mutex> lck(g_mutex);

g_produce.wait_for(lck,std::chrono::seconds(1));

std::cout << "###-> producer " << ": ";
std::cout << std::this_thread::get_id() << " : ";
g_queue.push(99);
std::cout << g_queue.size() << '\n';
// notify(wake up) consumer
g_consume.notify_all();

}
}

int main(int argc, char const *argv[])
{
std::vector<std::thread> vcthread;

for (int i = 0; i < 2; ++i)
{
vcthread.emplace_back(producer);
vcthread.emplace_back(consumer);
}

for( auto & th : vcthread)
th.join();
return 0;
}