C++:多线程

/ C++ / 没有评论 / 2451浏览

Lambda表达式

//Lambda表达式:[]捕获列表()参数列表{}函数主体
1.[var]表示值传递方式捕捉变量var;
2.[=]表示值传递方式捕捉所有父作用域的变量(包括this);
3.[&var]表示引用传递捕捉变量var;
4.[&]表示引用传递方式捕捉所有父作用域的变量(包括this);
5.[this]表示值传递方式捕捉当前的this指针 /
#include <iostream>
#include <thread>
#include <mutex>
#include <chrono>
std::mutex g_lock;   //独占的互斥量,不能递归使用
void func()
{
	g_lock.lock();
	std::cout << "entered thread" << std::this_thread::get_id() << std::emdl;
	std::this_thread::sleep_for(std::chrono::seconds(1));
	std::cout << "leave thread" << std::this_thread::get_id() << std::endl;
	g_lock.unlock();
	/*
	std::lock_guard <std::mutex> locker(g_lock);出作用域之后自动解锁
	*/
}
int main()
{
	std::thread t1(func);
	std::thread t2(func);
	std::thread t3(func);
	t1.join();
	t2.join();
	t3.join();
	return 0;
}

//递归互斥量,不带超时功能 //用来解决统一线程需要多次获取互斥量时死锁的问题

  std::timed_mutex mutex;
void work()
{
	std::chrono::milliseconds timeout(100); //超时时间 ,如果超时,休眠100毫秒,再继续获取超时锁
	while(true)
	{
		if(mutex.try_lock_for(timeout)) 
		{
			std::cout<<std::this_thread::get_id() << ":do work with the mutex" << std::endl;
			std::chrono::milliseconds sleepDuration(250);
			std::this_thread::sleep_for(sleepDuration);
			mutex.unlock();
			std::this_thread::sleep_for(sleepDuration);
		}
		else
		{
			std::cout << std::this_thread::get_id() << ": do work without mutex" << std::endl;
			std::chrono::milliseconds sleepDuration(100);
			std::this_thread::sleep_for(sleepDuration);
		}
	}
}  

/ 条件变量: C++11提供两种条件变量:condition_variable配合std::unique_lock 进行wait操作 condition_variable_any 和任意带有lock、unlock语义的mutex搭配使用,比较灵活,但效率比condition_variable差一些 1.拥有条件变量的线程获取互斥量 2.循环检查某个条件,如果条件不满足,则阻塞直到条件满足,如果条件满足,则向下执行 3.某个线程满足条件执行完之后调用notify_one或者notify_all唤醒一个或者所有的等待线程。 /

/ 这个同步队列在没有满的情况下,可以插入数据,如果满了,则会调用m_notFull阻塞等待,待消费线程取出数据之后发一个未满的通知 ,然后前面阻塞的线程就会唤醒继续往下执行;如果队列为空,就不能取数据,会调用m_notEmpty条件变量阻塞,等待插入数据的线程发出不为空 的通知时,才能往下继续执行 /

#include <mutex>
#include <thread>
#include <condition_variable>
template<typename T>
class SyncQueue
{
	bool isFull() const
	{
		return m_queue.size() == m_maxSize;
	}
	bool isEmpty() const
	{
		return m_queue.empty();
	}
public:
	SyncQueue(int maxSize) : m_maxSize(maxSize)
	{
	}
	void Put(const T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		while(isFull())
		{
			cout << "缓存区满了,需要等待"<<endl;          //while循环等同于 m_notFull.wait(locker,[this]{return !isFull();});
			m_notFull.wait(m_mutex);                        //条件变量会先检查判断式是否满足条件,如果满足条件,则重新获取mutex,然后结束wait
		}                                                   //,继续往下执行;如果不满足条件,释放mutex,将线程置为waiting状态,继续等待
		m_queue.push_back(x);
		m_notEmpty.notify_one();
	}
	void Take(T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		while(isEmpty())
		{
			cout<<"缓存区空了,需要等待" <<endl;
			m_notEmpty.wait(m_mutex);
		}
		x = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}
	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}
	int Count()
	{
		return m_queue.size();
	}
private:
	std::list<T> m_queue;  //缓冲区
	std::mutex m_mutex;
	std::condition_variable_any m_notEmpty; //不为空的条件变量
	std::condition_variable_any m_notFull;  //没有满的条件变量
	int m_maxSize;//同步队列最大的size
}

###把std::lock_guard改成std::unique_lock,把std::condition_variable_any改为condition_variable

#include <thread>
#include <condition_variable>
#include <mutex>
#include <list>
#include <iostream>
using namespace std;
template <typename T>
class SimpleSyncQueue
{
public:
	SimpleSyncQueue()
	{
	}
	~SimpleSyncQueue();
	
	void Put(const T& x)
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		m_queue.push_back(x);
		m_notEmpty.notify_one();
	}
	void Take(T& x)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker,[this]{return !m_queue.empty()});
		x = m_queue.front();
		m_queue.pop_front();
	}
	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}
private:
	std::list<T> m_queue;
	std::mutex m_mutex;
	std::condition_variable m_notEmpty;

###//用mutex实现的计时器

struct Counter
{
	int value;
	std::mutex mutex;
	void increment()
	{
		std::lock_guard<std::mutex> lock(mutex);
		++value;
	}
	void decrement()
	{
		std::lock_guard<std::mutex> lock(mutex);
		--value;
	}
	int get()
	{
		return value;
	}
};

###//用原子变量实现的计时器就不需要互斥变量

#include <atomic>
struct AtomicCounter
{
	std::atomic<int> value;
	void increment()
	{
		++value;
	}
	void decrement()
	{
		--value;
	}
	int get()
	{
		return value.load();
	}
};

###//call_once/once_flag的使用 保证多线程中的函数只被调用一次

#include <iostream>
#include <thread>
#include <mutex>
std::once_flag flag;
void do_once()
{
	std::call_once(flag,[](){std::cout << "called once" << endl;});
}
int main()
{
	std::thread t1(do_once);
	std::thread t2(do_once);
	t1.join();
	t2.join();
	return 0;
}

//获取线程函数返回值的类 std::future //get_future : 返回具有和承诺相同的“关联的异步状态”的将来的对象 //通过查询 future 的状态来或取异步操作的结果,future_status有三种状态:Deferred:还没开始,Ready:异步操作已经完成,Timeout:异步操作超时 //查询frture状态

std::future_status status;
do{
	status = future.wait_for(std::chrono::seconds(1));
	if(status == std::future_status::deferred)
	{
		std::cout << "未开始\n" << endl;
	}
	else if(status == std::future_status::timeout)
	{
		std::cout << "操作超时\n" << endl;
	}
	else if(status == std::future_status::ready)
	{
		std::cout << "完成\n" << endl;
	}
}while(status != std::future_status::ready);
//获取future结果3种方式:get\wait\wait_for.get是等待异步操作结束并返回结果。
//wait只是等待异步操作完成,wait_for是超时等待返回结果

//协助线程赋值的类std::promise //为获取线程函数中的某个值提供便利,在线程函数中为外面传进来的promise赋值,在线程函数执行完成之后就可以通过promise的future获取该值。 //取值是间接的通过promise内部提供的futrue来获取的

std::promise<int> pr;
std::thread t([](std::promise<int>& p){p.set_value_at_thread_exit(9);},std::ref(pr));
std::future<int> f = pr.get_futrue();
auto r = f.get();

//可调用对象的包装类std::packaged_task //包装了一个可调用对象的包装类,将函数和future绑定,保存一个函数。 //std::ref 用于包装按引用传递的值。 //std::cref 用于包装按const 引用传递的值。

std::packaged_task<int ()> task([](){return 7;});
std::thread t1(std::ref(task));
std::future<int> f1 = task.get_futrue();
auto r1 = f1.get();

//std::packaged_task和std::promise,内部都有一个future以便访问异步操作结果,std::packaged_task中包装的是一个异步操作,std::promise包装的 //是一个值,因为有时候获取线程中的某个值,用std::promise。有时候需要获一个异步操作的返回值,用std::packaged_task。 //可以将std::packaged_task异步操作的值保存在std::promise。 //future被promise和packaged_task用来作为异步操作或者异步结果保存到std::promise,用std::future和std::shared_future来获取调用的结果。 //feture是不可以拷贝,只能移动,shared_future是可以拷贝的,当需要将future放到容器中则需要用shared_future。

#include <iostream>
#include <utility>  //以帮助构建和管理的对象, <utility>标题将自动包含通过 <map> 要帮助管理其键/值对键入的元素。
#include <future>
#include <thread>
int func (int x)
{
	return x+2;
}
int main()
{
	std::packaged_task<int (int)> tak(func);
	std::future<int> fut = tak.get_futrue(); //获取feture
	std::thread(std::move(tak),2).detach();  //task作为线程函数,std::move是将一个左值强制转化为右值引用,继而通过右值引用该值
	//detach调用之后,目标线程就成为了守护线程,驻留后台运行,与之关联的std::thread对象失去对目标线程的关联,
	//无法再通过std::thread对象取得该线程的控制权。当线程主函数执行完之后,线程就结束了,运行时库负责清理与该线程相关的资源
	int value = fut.get();//等待task完成并且或许结果
	std::cout << "this future is "  << value << endl;
	std::vector<std::shared_future<int>> v;
	auto f = std::async(std::launch::async,[](int a,int b){return a+b;},2,3);
	v.push_back(f);
	std::cout << "the shared_future result is " << v[0].get() <<endl;
	return 0;
}
//this result is 4
//the shared_future result is 5

//std::async直接创建异步的task,异步操作返回的结果也保存在future中,当需要获取异步操作结果时,只需要调用future.get(); //要是不想要结果,future.wait()即可。 //std::async(模式,线程函数,线程函数的参数….);模式:std::launch::async:调用async的时候开始创建线程, //std::launch::deferred:延迟加载方式创建线程,直到调用future的get或者wait才创建线程

std::future<int> f1 = std::async(std::launch::async,[](){return 8;});
cout << f1.get() << endl;   //output :  8
std::future<int> f2 = std::async(std::launch::async,[](){cout << 8 << endl;});
f2.wait();  //output:8
std::future<int> future = std::async(std::launch::async,[](){std::this_thread::sleep_for(std::chrono::seconds(3));
									return 8;
});
std::cout  << "waiting..." << endl;
std::future_status status;
do{
	status = future.wait_for(std::chrono::seconds(1));
	if(status == std::future_status::deferred)
	{
		std::cout<<"未开始\n"<<endl;
	}
	else if(status == std::future_status::timeout)
	{
		std::cout<<"超时\n"<<endl;
	}
	else if(status == std::future_status::ready)
	{
		std::cout<<"完成\n"<<endl;
	}
}while(status != std::future_status::ready);
std::cout<<"result is"<<future.get()<<"\n";