⬑
用C++模拟Mesa Monitor
2019-06-20Monitor(管程)是并发程序的同步方式之一。Monitor至少有两类,Mesa monitor和Hoare monitor。Mesa monitor在notify之后会继续运行,Hoare monitor在notify之后会进行context switch,来到wait的地方开始运行,所以在写wait的时候,Mesa monitor需要这样:
while (locked)
wait();
但是Hoare Monitor只需要这样:
if (locked)
wait();
目前还是Mesa Monitor最为常见。
实现 monitor 需要语言层面的支持。Java有synchronized
关键字,可以用来实现monitor,但是C++就没有了,不过还是可以用condition variable和RAII,来模拟Mesa monitor。
#include <mutex>
#include <condition_variable>
class Monitor{
public:
Monitor():lk{m, std::defer_lock}{}
void notify(){cv.notify_one();}
void broadcast(){cv.notify_all();}
template<typename F>
void wait(F pred){cv.wait(lk, pred);};
std::unique_lock<std::mutex> synchronize()
{
return std::unique_lock<std::mutex>{m};
}
private:
std::mutex m;
std::unique_lock<std::mutex> lk;
std::condition_variable cv;
};
来看一个简单的例子,用monitor实现互斥锁。虽然这里例子没什么实际意义,但是足够简单:
// To compile: g++ -std=c++14 -lpthread MonitorLock.cpp
#include "Monitor.h"
#include <thread>
#include <iostream>
using namespace std;
class MonitorLock{
public:
void lock()
{
auto lk = m.synchronize(); // unique_lock 会通过 RAII 自动 unlock
m.wait([&](){return !locked;});
locked = true;
}
void unlock()
{
auto lk = m.synchronize();
locked = false;
m.notify();
}
private:
Monitor m;
bool locked = false;
};
int main()
{
MonitorLock m;
thread t1{[&](){
for (int i = 1; i <= 30; i++){
m.lock();
cout << "t1: " << i << endl;
m.unlock();
}
}};
thread t2{[&](){
for (int i = 1; i <= 30; i++){
m.lock();
cout << "t2: " << i << endl;
m.unlock();
}
}};
t1.join();
t2.join();
return 0;
}
另一个例子稍微实用一点,解决生产者消费者问题。
// To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp
#include "Monitor.h"
#include <thread>
#include <iostream>
#include <queue>
using namespace std;
template<typename T, int N>
class ProducerConsumer{
public:
void insert(T& item)
{
auto lk = m.synchronize();
m.wait([&](){return items.size() < N;}); // if(!full)
items.push(item);
if(items.size() == 1){
m.notify();
}
cout << "insert: " << item << endl;
}
T remove()
{
auto lk = m.synchronize();
m.wait([&](){return items.size() > 0;}); // if(!empty)
auto item = items.front();
items.pop();
if(items.size() == N-1){
m.notify();
}
cout << "consume: " << item << endl;
return item;
}
private:
Monitor m;
std::queue<T> items;
};
int main()
{
ProducerConsumer<int, 10> q;
thread p{[&](){
for(int i = 1; i < 30; i++){
q.insert(i);
}
}};
thread c{[&](){
for(int i = 1; i < 30; i++){
auto item = q.remove();
}
}};
p.join();
c.join();
return 0;
}
上面的例子只适用于单生产者单消费者问题,如果要解决多生产者多消费者问题,一种做法是设置一个 threshold:
// insert()
if (items.size() >= comsumerThreshold)
m.broadcast();
// remove()
if(items.size() <= producerThreshold)
m.broadcast()
或者更细粒度的控制condition variable的使用:
// To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp
#include <thread>
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
using namespace std;
template<typename T, int N>
class ProducerConsumer{
public:
void insert(T& item)
{
std::unique_lock<std::mutex> lk{m};
insert_cv.wait(lk, [&](){return items.size() < N;}); // if(!full)
items.push(item);
// 如果这里是Hoare monitor就会跳转到正在wait的remove函数,
// 可惜这里是mesa
remove_cv.notify_one();
cout << "insert: " << item << endl;
}
T remove()
{
std::unique_lock<std::mutex> lk{m};
remove_cv.wait(lk, [&](){return items.size() > 0;}); // if(!empty)
auto item = items.front();
items.pop();
insert_cv.notify_one();
cout << "consume: " << item << endl;
return item;
}
private:
mutex m;
condition_variable insert_cv, remove_cv;
std::queue<T> items;
};
int main()
{
ProducerConsumer<int, 10> q;
thread p{[&](){
for(int i = 1; i < 30; i++){
q.insert(i);
}
}};
thread c{[&](){
for(int i = 1; i < 30; i++){
auto item = q.remove();
}
}};
p.join();
c.join();
return 0;
}
Email: i (at) mistivia (dot) com