C++11: 多线程 future/promise简介

/ C++ / 没有评论 / 2013浏览

std::future

简单来说,std::future提供了一种访问异步操作结果的机制。

从字面意思看,它表示未来。通常一个异步操作我们是不能马上就获取操作结果的,只能在未来某个时候获取。我们可以以同步等待的方式来获取结果,可以通过查询future的状态(future_status)来获取异步操作的结果。future_status有三种状态: deferred:异步操作还没开始 ready:异步操作已经完成 timeout:异步操作超时

获取future结果有三种方式:get、wait、wait_for,其中get等待异步操作结束并返回结果,wait只是等待异步操作完成,没有返回值,wait_for是超时等待返回结果。

例子:

//查询future的状态
    std::future_status status;
    do {
        status = future.wait_for(std::chrono::seconds(1));
        if (status == std::future_status::deferred) {
            std::cout << "deferred\n";
        } else if (status == std::future_status::timeout) {
            std::cout << "timeout\n";
        } else if (status == std::future_status::ready) {
            std::cout << "ready!\n";
        }
    } while (status != std::future_status::ready);

std::promise

Promise对象可保存T类型的值,该值可被future对象读取(可能在另一个线程中),这是promise提供同步的一种手段。在构造promise时,promise对象可以与共享状态关联起来,这个共享状态可以存储一个T类型或者一个由std::exception派生出的类的值,并可以通过get_future来获取与promise对象关联的对象,调用该函数之后,两个对象共享相同的共享状态(shared state)。 Promise对象是异步provider,它可以在某一时刻设置共享状态的值。 Future对象可以返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标识变为ready,然后才能获取共享状态的值。

例子:

#include <iostream>       // std::cout
#include <functional>     // std::ref
#include <thread>         // std::thread
#include <future>         // std::promise, std::future

void print_int(std::future<int>& fut) {
    int x = fut.get(); // 获取共享状态的值.
    std::cout << "value: " << x << '\n'; // 打印 value: 10.
}

int main ()
{
    std::promise<int> prom; // 生成一个 std::promise<int> 对象.
    std::future<int> fut = prom.get_future(); // 和 future 关联.
    std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.
    prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
    t.join();
    return 0;
}

std::promise 构造函数

构造函数column3
default (1)promise();
with allocator (2)template promise (allocator_arg_t aa, const Alloc& alloc);
copy [deleted] (3)promise (const promise&) = delete;
move (4)promise (promise&& x) noexcept;
  1. 默认构造函数,初始化一个空的共享状态
  2. 带自定义内存分配器的构造函数,与默认构造函数类似,但是使用自定义分配器来分配共享状态。
  3. 拷贝构造函数,被禁用。
  4. 移动构造函数。

另外,std::promise 的 operator= 没有拷贝语义,即 std::promise 普通的赋值操作被禁用,operator= 只有 move 语义,所以 std::promise 对象是禁止拷贝的。

std::promise 成员函数 std::promise::get_future:返回一个与promise共享状态相关联的future对象 std::promise::set_value:设置共享状态的值,此后promise共享状态标识变为ready std::promise::set_exception:为promise设置异常,此后promise的共享状态标识变为ready std::promise::set_value_at_thread_exit:设置共享状态的值,但是不将共享状态的标志设置为 ready,当线程退出时该 promise 对象会自动设置为 ready(注意:该线程已设置promise的值,如果在线程结束之后有其他修改共享状态值的操作,会抛出future_error(promise_already_satisfied)异常) std::promise::swap:交换 promise 的共享状态

std::packaged_task

std::packaged_task包装了一个可调用的目标(如function, lambda expression, bind expression, or another function object),以便异步调用,它和promise在某种程度上有点像,promise保存了一个共享状态的值,而packaged_task保存的是一个函数。

    std::packaged_task<int()> task([](){ return 7; });
    std::thread t1(std::ref(task)); 
    std::future<int> f1 = task.get_future(); 
    auto r1 = f1.get();

Promise,Future 和 Callback常常作为并发编程中一组非阻塞的模型。其中 Future 表示一个可能还没有实际完成的异步任务的【结果】,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。

std::async

std::async大概的工作过程:先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的结果。

可以说,std::async帮我们将std::future、std::promise和std::packaged_task三者结合了起来。

std::async的原型:

async(std::launch::async | std::launch::deferred, f, args...)

第一个参数是线程的创建策略,默认的策略是立即创建线程:

std::launch::async:在调用async就开始创建线程。 std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。 第二个参数是线程函数,后面的参数是线程函数的参数。

例子:

 std::future<int> f1 = std::async(std::launch::async, [](){ 
        return 8;  
    }); 

    cout<<f1.get()<<endl; //output: 8

    std::future<int> f2 = std::async(std::launch::async, [](){ 
        cout<<8<<endl;
    }); 

    f2.wait(); //output: 8
#include <iostream>
#include <future>
#include <memory>
#include <windows.h>
#include <thread>

std::string helloFunction(const std::string& s) {
	Sleep(5000);
	return "Hello C++11 from " + s + ".";
}

int main()
{
	auto query_promise = std::make_shared<std::promise<std::string>>();
	auto query_feature = query_promise->get_future();
	
	std::string a = "function";

	std::thread t([query_promise](const std::string& s) {
		//std::cout << "start" << std::endl;
		std::cout << query_promise.use_count() << std::endl;
		Sleep(3000);
		query_promise->set_value(s);
	}, a);

	std::cout << "main thread" << std::endl;
	
	auto st = query_feature.wait_for(std::chrono::milliseconds(1000));
	
	if (st == std::future_status::ready) 
	{
		if (query_feature.valid())
		{
			std::cout << query_feature.get().c_str() << std::endl;
		}
		t.join();
	}
	else if (st == std::future_status::timeout)
	{
		std::cout << "timeout" << std::endl;
		t.join();
	}
	
	system("PAUSE");
	return 0;
}