C++并行开发14-补充学习

FengLY Lv3

前言

主要是C++异步和同步线程进行总结

异步编程

线程同步主要是为了解决对共享数据的竞争访问问题,所以线程同步主要是对共享数据的访问同步化(按照既定的先后次序,一个访问需要阻塞等待前一个访问完成后才能开始)。这篇文章谈到的异步编程主要是针对任务或线程的执行顺序,也即一个任务不需要阻塞等待上一个任务执行完成后再开始执行,程序的执行顺序与任务的排列顺序是不一致的。下面从任务执行顺序的角度解释下同步与异步的区别:
同步:就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。
异步:调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
多线程是实现异步编程的一种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <vector>
#include <thread>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>

int accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last)
{
for(int i = 0; i < 1000; i++){
std::cout << "accumulate ------------------------------- " << std::endl;
}
int sum = std::accumulate(first, last, 0);
return sum;
}

int main()
{
// 演示用 async 在线程间传递结果。
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
auto accumulate_future = std::async(std::launch::async, accumulate, numbers.begin(), numbers.end()); //auto可以自动推断变量的类型

//多线程是实现异步编程的一种方式

for(int i = 0; i < 1000; i++){
std::cout << "main ***********************" << std::endl;
}


std::cout << "result=" << accumulate_future.get() << '\n';

getchar();
return 0;
}

这部分的输出中accumulate 和main 是并存的

std::async

C++11中的std::async是个模板函数。std::async异步调用函数,在某个时候以Args作为参数(可变长参数)调用Fn,无需等待Fn执行完成就可返回,返回结果是个std::future对象。Fn返回的值可通过std::future对象的get成员函数获取。一旦完成Fn的执行,共享状态将包含Fn返回的值并ready。
std::async有两个版本:
1.无需显示指定启动策略,自动选择,因此启动策略是不确定的,可能是launch::async,也可能是launch::deferred,或者是两者的任意组合,取决于它们的系统和特定库实现。
2.允许调用者选择特定的启动策略。 std::async的启动策略类型是个枚举类enum class launch,包括:

  1. launch::async:异步,启动一个新的线程调用Fn,该函数由新线程异步调用,并且将其返回值与共享状态的访问点同步。

  2. launch::deferred:延迟,在访问共享状态时该函数才被调用。对Fn的调用将推迟到返回的std::future的共享状态被访问时(使用std::future的wait或get函数)。

参数Fn:可以为函数指针、成员指针、任何类型的可移动构造的函数对象(即类定义了operator()的对象)。Fn的返回值或异常存储在共享状态中以供异步的std::future对象检索。
参数Args:传递给Fn调用的参数,它们的类型应是可移动构造的。
返回值:当Fn执行结束时,共享状态的std::future对象准备就绪。std::future的成员函数get检索的值是Fn返回的值。当启动策略采用launch::async时,即使从不访问其共享状态,返回的std::future也会链接到被创建线程的末尾。在这种情况下,std::future的析构函数与Fn的返回同步。
详细用法见下面的测试代码,下面是从其他文章中copy的测试代码,部分作了调整,详细内容介绍可以参考对应的reference:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/*
* @Author: your name
* @Date: 2021-01-25 11:16:44
* @LastEditTime: 2021-01-29 17:44:42
* @LastEditors: Please set LastEditors
* @Description: In User Settings Edit
* @FilePath: /myCOde/app/future.cpp
*/
#include <iostream>
#include <thread>
#include <algorithm>
#include <vector>
#include <list>
#include <mutex>
#include <future> //引入std::future头文件
using namespace std;


int test_async_1()
{
auto is_prime = [](int x) {
std::cout << "Calculating. Please, wait...\n";
for (int i = 2; i < x; ++i) if (x%i == 0) return false;
return true;
};

// call is_prime(313222313) asynchronously:
std::future<bool> fut = std::async(is_prime, 313222313); //执行这一句的时候就已经开始调用函数了
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
std::cout << "Checking whether 313222313 is prime.\n";
// ...

bool ret = fut.get(); // waits for is_prime to return
if (ret) std::cout << "It is prime!\n";
else std::cout << "It is not prime.\n";

return 0;
}

int test_async_2()
{
auto print_ten = [](char c, int ms) {
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(ms));
std::cout << c;
}
};

std::cout << "with launch::async:\n";
std::future<void> foo = std::async(std::launch::async, print_ten, '*', 100);
std::future<void> bar = std::async(std::launch::async, print_ten, '@', 200);
// async "get" (wait for foo and bar to be ready):
foo.get(); // 注:注释掉此句,也会输出'*'
bar.get();
std::cout << "\n\n";
std::this_thread::sleep_for(std::chrono::milliseconds(2000));

std::cout << "with launch::deferred:\n";
foo = std::async(std::launch::deferred, print_ten, '*', 100);
bar = std::async(std::launch::deferred, print_ten, '@', 200);
// deferred "get" (perform the actual calls):
//foo.get(); // 注:注释掉此句,则不会输出'**********'
//bar.get();
std::cout << '\n';

return 0;
}



std::mutex m;

struct X {
void foo(int i, const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << ' ' << i << '\n';
}
void bar(const std::string& str) {
std::lock_guard<std::mutex> lk(m);
std::cout << str << '\n';
}
int operator()(int i) {
std::lock_guard<std::mutex> lk(m);
std::cout << i << '\n';
return i + 10;
}
};

int test_async_3()
{
X x;
// Calls (&x)->foo(42, "Hello") with default policy:
// may print "Hello 42" concurrently or defer execution
auto a1 = std::async(&X::foo, &x, 42, "Hello");
// Calls x.bar("world!") with deferred policy
// prints "world!" when a2.get() or a2.wait() is called
auto a2 = std::async(std::launch::deferred, &X::bar, x, "world!");
// Calls X()(43); with async policy
// prints "43" concurrently
auto a3 = std::async(std::launch::async, X(), 43);
a2.wait(); // prints "world!"
std::cout << a3.get() << '\n'; // prints "53"

return 0;
} //

int main(){
test_async_1();
test_async_2();
return 0;
}

std::promise

C++11中的std::promise是个模板类。一个std::promise对象可以存储由future对象(可能在另一个线程中)检索的T类型的值或派生自std::exception的异常,并提供一个同步点。
在构造std::promise对象时,该对象与新的共享状态(shared
state)关联。通过调用std::promise的get_future函数,可以将该共享状态与std::future对象关联。调用之后,两个对象共享相同的共享状态:(1).std::promise对象是异步提供程序(asynchronous
provider),应 在某个时刻为共享状态设置一个值。(2).std::future对象是个异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪。
模板类std::promise成员函数包括:

  1. 构造函数:(1).默认构造函数:通过访问新的空共享状态来初始化对象(The object is initialized with access to a new empty shared state)。(2).带allocator的构造函数。(3).禁用拷贝构造。(4).支持移动构造。

  2. 析构函数:(1).放弃(abandon)共享状态并销毁promise对象。(2).如果有其它future对象关联到同一共享状态,则共享状态本身不会被销毁。(3).如果promise对象在共享状态准备就绪前被销毁,则共享状态自动准备就绪并包含一个std::future_error类型的异常。

  3. get_future函数:(1).返回一个与promise对象的共享状态关联的std::future对象。(2).一旦准备就绪,返回的std::future对象就可以访问promise对象在共享状态上设置的值或异常。(3).每个promise共享状态只能被一个std::future对象检索(Only one future object can be retrieved for each promise shared state)。(4).调用此函数后,promise应在某个时候使其共享状态准备就绪(通过设置值或异常),否则将在销毁时自动准备就绪并包含一个std::future_error类型的异常。

  4. operator=:(1).禁用拷贝赋值。(2).支持移动赋值。

  5. set_exception:将异常指针存储进共享状态即设置共享状态的异常指针,准备就绪。

  6. set_exception_at_thread_exit:设置共享状态的异常指针,但并不将该共享状态的标志设置为ready,当线程退出时,该promise对象会自动设置为ready (Stores the exception pointer p in the shared state without making it ready immediately. Instead, it will be made ready automatically at thread exit, once all objects of thread storage duration have been destroyed)。

  7. set_value:(1).将值存储进共享状态即设置共享状态的值,准备就绪。(2).set_value(void)只是简单使共享状态就绪而无须设置任何值。

  8. set_value_at_thread_exit:设置共享状态的值,但并不将该共享状态的标志设置为ready,当线程退出时,该promise对象会自动设置为ready(Stores val as the value in the shared state without making it ready immediately. Instead, it will be made ready automatically at thread exit, once all objects of thread storage duration have been destroyed)。

  9. swap/非成员模板函数swap:交换共享状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/*
* @Author: your name
* @Date: 2021-01-25 11:16:44
* @LastEditTime: 2021-01-29 20:18:12
* @LastEditors: Please set LastEditors
* @Description: In User Settings Edit
* @FilePath: /myCOde/app/future.cpp
*/
#include <iostream>
#include <thread>
#include <algorithm>
#include <vector>
#include <list>
#include <mutex>
#include <future> //引入std::future头文件
#include <numeric>

using namespace std;

void test_promise_1(){
{
std::promise<int> foo; // create promise
std::promise<int> bar = std::promise<int>(std::allocator_arg, std::allocator<int>());

std::future<int> fut = bar.get_future(); // engagement with future
auto print_int = [&fut]() { int x = fut.get(); fprintf(stdout, "value: %d\n", x); };
std::thread th1(print_int); // send future to new thread
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
cout << "set value ... " << endl;
bar.set_value(10);
cout << "set value over " << endl;
th1.join();
}
{
std::promise<int> prom;
std::future<int> fut = prom.get_future();

auto get_int = [&prom]() {
int x;
std::cout << "Please, enter an integer value: ";
std::cin.exceptions(std::ios::failbit); // throw on failbit
try {
std::cin >> x; // sets failbit if input is not int
prom.set_value(x);
} catch (std::exception&) {
prom.set_exception(std::current_exception());
}
};

auto print_int = [&fut]() {
try {
int x = fut.get();
std::cout << "value: " << x << '\n';
} catch (std::exception& e) {
std::cout << "[exception caught: " << e.what() << "]\n";
}
};

std::thread th1(print_int);
std::thread th2(get_int);

th1.join();
th2.join();
}
}


//这里已经传引用
void myAccumulate(std::vector<int>::iterator first, std::vector<int>::iterator last, std::promise<int> accumulate_promise)
{
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum); // Notify future
}

void do_work(std::promise<void> barrier)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
barrier.set_value();
}

int test_promise_2(){
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(myAccumulate, numbers.begin(), numbers.end(), std::move(accumulate_promise));
std::cout << "result=" << accumulate_future.get() << '\n';
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); //这里加入延迟的目的是为了可以更加真实的看出当前的运行流程
cout << "accumulate_future get " << endl;
work_thread.join(); // wait for thread completion

// Demonstrate using promise<void> to signal state between threads.
cout << "run to this " << endl;
std::promise<void> barrier;
std::future<void> barrier_future = barrier.get_future();
std::thread new_work_thread(do_work, std::move(barrier));
barrier_future.wait();
new_work_thread.join();
}


void test_peomise_3(){
std::promise<int> p;
std::future<int> f = p.get_future();

std::thread([&p] {
std::this_thread::sleep_for(1s);
p.set_value_at_thread_exit(9); // gcc 4.9 don't support this function
}).detach();

std::cout << "Waiting..." << std::flush;
f.wait();
std::cout << "Done!\nResult is: " << f.get() << '\n';
}
int main(){
test_promise_1();
test_promise_2();
test_peomise_3();
return 0;
}

std::packaged_task

C++11中的std::packaged_task是个模板类。std::packaged_task包装任何可调用目标(函数、lambda表达式、bind表达式、函数对象)以便它可以被异步调用。它的返回值或抛出的异常被存储于能通过std::future对象访问的共享状态中。
std::packaged_task类似于std::function,但是会自动将其结果传递给std::future对象。
std::packaged_task对象内部包含两个元素:(1).存储的任务(stored
task)是一些可调用的对象(例如函数指针、成员或函数对象的指针)( A stored task, which is some callable
object (such as a function pointer, pointer to member or function
object))。(2).共享状态,它可以存储调用存储的任务(stored task)的结果,并可以通过std::future进行异步访问(A shared
state, which is able to store the results of calling the stored task and be
accessed asynchronously through a future)。
通过调用std::packaged_task的get_future成员将共享状态与std::future对象关联。调用之后,两个对象共享相同的共享状态:(1).std::packaged_task对象是异步提供程序(asynchronous
provider),应通过调用存储的任务(stored
task)在某个时刻将共享状态设置为就绪。(2).std::future对象是一个异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪。
共享状态的生存期至少要持续到与之关联的最后一个对象释放或销毁为止。 std::packaged_task不会自己启动,你必须调用它(A
packaged_task won’t start on it’s own, you have to invoke it)。
模板类std::packaged_task成员函数包括:

  1. 构造函数:(1).默认构造函数:无共享状态无存储任务(no shared state and no stored task)情况下初始化对象。(2). initialization constructor:该对象具有共享状态,且其存储的任务由fn初始化。(3). initialization constructor with allocator。(4).禁用拷贝构造。(5).支持移动构造。

  2. 析构函数:(1).放弃(abandon)共享状态并销毁packaged_task对象。(2). 如果有其它future对象关联到同一共享状态,则共享状态本身不会被销毁。(3). 如果packaged_task对象在共享状态准备就绪前被销毁,则共享状态自动准备就绪并包含一个std::future_error类型的异常。

  3. get_future函数:(1).返回一个与packaged_task对象的共享状态关联的std::future对象。(2).一旦存储的任务被调用,返回的std::future对象就可以访问packaged_task对象在共享状态上设置的值或异常。(3).每个packaged_task共享状态只能被一个std::future对象检索(Only one future object can be retrieved for each packaged_task shared state)。(4).调用此函数后,packaged_task应在某个时候使其共享状态准备就绪(通过调用其存储的任务),否则将在销毁后自动准备就绪并包含一个std::future_error类型的异常。

  4. make_ready_at_thread_exit函数:在线程退出时才使共享状态ready而不是在调用完成后就立即ready。

  5. operator=:(1).禁用拷贝赋值。(2).支持移动赋值。

  6. operator():(1).call stored task。(2).如果对存储任务的调用成功完成或抛出异常,则返回的值或捕获的异常存储在共享状态,共享状态准备就绪(解除阻塞当前等待它的所有线程)。

  7. reset函数:(1).在保持相同存储的任务的同时,以新的共享状态重置对象。(2).允许再次调用存储的任务。(3).与对象关联的之前的共享状态被放弃(就像packaged_task被销毁了一样)。(4).在内部,该函数的行为就像是移动赋值了一个新构造的packaged_task一样(Internally, the function behaves as if move-assigned a newly constructed packaged_task (with its stored task as argument))。

  8. swap函数/非成员模板函数swap:交换共享状态和存储的任务(stored task)。

  9. valid函数:检查packaged_task对象是否具有共享状态。

packaged_task想要实现异步,必须调用thread,创建一个新的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
* @Author: your name
* @Date: 2021-01-25 11:16:44
* @LastEditTime: 2021-01-29 21:09:55
* @LastEditors: Please set LastEditors
* @Description: In User Settings Edit
* @FilePath: /myCOde/app/future.cpp
*/
#include <iostream>
#include <thread>
#include <algorithm>
#include <vector>
#include <list>
#include <mutex>
#include <future> //引入std::future头文件
#include <numeric>
#include <functional>
#include <cmath>

using namespace std;


void test_packaged_task_1(){
{
std::packaged_task<int(int)> foo;
std::packaged_task<int(int)> bar([](int x){ return x * 2;});

foo = std::move(bar); //右移
std::cout << "vaild: " << foo.valid() << endl;
std::cout << "vaild: " << bar.valid() << endl;
std::future<int> ret = foo.get_future(); //get future
std::thread(std::move(foo), 10).detach();

int value = ret.get();
std::cout << "The double of 10 is " << value << ".\n";
}

{ // reset/operator()
std::packaged_task<int(int)> tsk([](int x) { return x * 3; }); // package task

std::future<int> fut = tsk.get_future();
tsk(33);
std::cout << "The triple of 33 is " << fut.get() << ".\n";

// re-use same task object:
tsk.reset();
fut = tsk.get_future();
std::thread(std::move(tsk), 99).detach();
std::cout << "Thre triple of 99 is " << fut.get() << ".\n";
}
{
auto countdown = [](int from, int to) {
for (int i = from; i != to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Lift off!\n";
return from - to;
};

std::packaged_task<int(int, int)> tsk(countdown); // set up packaged_task
std::future<int> ret = tsk.get_future(); // get future

std::thread th(std::move(tsk), 5, 0); // spawn thread to count down from 5 to 0

int value = ret.get(); // wait for the task to finish and get result
std::cout << "The countdown lasted for " << value << " seconds.\n";

th.join();
}
}



int test_packaged_task_2()
{
{ // lambda
std::packaged_task<int(int, int)> task([](int a, int b) { return std::pow(a, b);});
std::future<int> result = task.get_future();

task(2, 9);
std::cout << "task_lambda:\t" << result.get() << '\n';
}

{ // bind
std::packaged_task<int()> task(std::bind([](int x, int y) { return std::pow(x, y); }, 2, 11));
std::future<int> result = task.get_future();

task();
std::cout << "task_bind:\t" << result.get() << '\n';
}

{ // thread
std::packaged_task<int(int, int)> task([](int x, int y) { return std::pow(x, y); });
std::future<int> result = task.get_future();

std::thread task_td(std::move(task), 2, 10);
task_td.join();
std::cout << "task_thread:\t" << result.get() << '\n';
}

return 0;
}


struct DBDataFetcher {
std::string operator()(std::string token)
{
// Do some stuff to fetch the data
std::string data = "Data From " + token;
return data;
}
};

//使用仿函数
int test_packaged_task_3()
{
// Create a packaged_task<> that encapsulated a Function Object
std::packaged_task<std::string(std::string)> task(std::move(DBDataFetcher()));

// Fetch the associated future<> from packaged_task<>
std::future<std::string> result = task.get_future();

// Pass the packaged_task to thread to run asynchronously
std::thread th(std::move(task), "Arg");

// Join the thread. Its blocking and returns when thread is finished.
th.join();

// Fetch the result of packaged_task<> i.e. value returned by getDataFromDB()
std::string data = result.get();
std::cout << data << std::endl;

return 0;
}


int test_packaged_task_4()
{
// sleeps for one second and returns 1
auto sleep = []() {
// // for(int i = 0; i < 1000; i++){
// // cout << "packaged task ----------" << i << endl;
// // }
// std::this_thread::sleep_for(std::chrono::seconds(100));
return 1;
};

{ // std::packaged_task
// >>>>> A packaged_task won't start on it's own, you have to invoke it
std::packaged_task<int()> task(sleep);

auto f = task.get_future();
task(); // invoke the function
for(int i = 0; i < 1000; i++){
cout << "main -----------" << i << endl;
}
// You have to wait until task returns. Since task calls sleep
// you will have to wait at least 1 second.
std::cout << "You can see this after 1 second\n";

// However, f.get() will be available, since task has already finished.
std::cout << f.get() << std::endl;
}

{ // std::async
// >>>>> On the other hand, std::async with launch::async will try to run the task in a different thread :
auto f = std::async(std::launch::async, sleep);
std::cout << "You can see this immediately!\n";

// However, the value of the future will be available after sleep has finished
// so f.get() can block up to 1 second.
std::cout << f.get() << "This will be shown after a second!\n";
}

return 0;
}


int main(){
// test_packaged_task_1();
test_packaged_task_4();
return 0;
}

参考资料

异步和同步的区别
C++多线程并发—异步编程
并发和并行,异步与多线程区别
C++11中std::future的使用
std::aysnc
std::promise
std::packaged_task

  • Title: C++并行开发14-补充学习
  • Author: FengLY
  • Created at : 2023-06-18 22:40:09
  • Updated at : 2023-06-18 23:00:02
  • Link: https://zhouaq.com/2023/06/18/C++并行开发14-补充学习/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments