../
用C++模拟Mesa Monitor ===================== 2019-06-20 Monitor(管程)是并发程序的同步方式之一。Monitor至少有两类,Mesa monitor和Hoare monitor。Mesa monitor在notify之后会继续运行,Hoare monitor在notify之后会进行 context switch,来到wait的地方开始运行,所以在写wait的时候,Mesa monitor需要这 样: while (locked) wait(); 但是Hoare Monitor只需要这样: if (locked) wait(); 目前还是Mesa Monitor最为常见。 实现 monitor 需要语言层面的支持。Java有`synchronized`关键字,可以用来实现 monitor,但是C++就没有了,不过还是可以用condition variable和RAII,来模拟Mesa monitor。 #include <mutex> #include <condition_variable> class Monitor{ public: Monitor():lk{m, std::defer_lock}{} void notify(){cv.notify_one();} void broadcast(){cv.notify_all();} template<typename F> void wait(F pred){cv.wait(lk, pred);}; std::unique_lock<std::mutex> synchronize() { return std::unique_lock<std::mutex>{m}; } private: std::mutex m; std::unique_lock<std::mutex> lk; std::condition_variable cv; }; 来看一个简单的例子,用monitor实现互斥锁。虽然这里例子没什么实际意义,但是足够简 单: // To compile: g++ -std=c++14 -lpthread MonitorLock.cpp #include "Monitor.h" #include <thread> #include <iostream> using namespace std; class MonitorLock{ public: void lock() { // unique_lock 会通过 RAII 自动 unlock auto lk = m.synchronize(); m.wait(&; locked = true; } void unlock() { auto lk = m.synchronize(); locked = false; m.notify(); } private: Monitor m; bool locked = false; }; int main() { MonitorLock m; thread t1{[&](){ for (int i = 1; i <= 30; i++){ m.lock(); cout << "t1: " << i << endl; m.unlock(); } }}; thread t2{[&](){ for (int i = 1; i <= 30; i++){ m.lock(); cout << "t2: " << i << endl; m.unlock(); } }}; t1.join(); t2.join(); return 0; } 另一个例子稍微实用一点,解决生产者消费者问题。 // To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp #include "Monitor.h" #include <thread> #include <iostream> #include <queue> using namespace std; template<typename T, int N> class ProducerConsumer{ public: void insert(T& item) { auto lk = m.synchronize(); // if(!full) m.wait(& < N;}); items.push(item); if(items.size() == 1){ m.notify(); } cout << "insert: " << item << endl; } T remove() { auto lk = m.synchronize(); m.wait(& > 0;}); // if(!empty) auto item = items.front(); items.pop(); if(items.size() == N-1){ m.notify(); } cout << "consume: " << item << endl; return item; } private: Monitor m; std::queue<T> items; }; int main() { ProducerConsumer<int, 10> q; thread p{[&](){ for(int i = 1; i < 30; i++){ q.insert(i); } }}; thread c{[&](){ for(int i = 1; i < 30; i++){ auto item = q.remove(); } }}; p.join(); c.join(); return 0; } 上面的例子只适用于单生产者单消费者问题,如果要解决多生产者多消费者问题,一种做 法是设置一个 threshold: // insert() if (items.size() >= comsumerThreshold) m.broadcast(); // remove() if(items.size() <= producerThreshold) m.broadcast() 或者更细粒度的控制condition variable的使用: // To compile: g++ -std=c++14 -lpthread ProducerConsumer.cpp #include <thread> #include <iostream> #include <queue> #include <mutex> #include <condition_variable> using namespace std; template<typename T, int N> class ProducerConsumer{ public: void insert(T& item) { std::unique_lock<std::mutex> lk{m}; insert_cv.wait(lk, & < N;}); // if(!full) items.push(item); // 如果这里是Hoare monitor就会跳转到正在wait的remove函数, // 可惜这里是mesa remove_cv.notify_one(); cout << "insert: " << item << endl; } T remove() { std::unique_lock<std::mutex> lk{m}; remove_cv.wait(lk, & > 0;}); // if(!empty) auto item = items.front(); items.pop(); insert_cv.notify_one(); cout << "consume: " << item << endl; return item; } private: mutex m; condition_variable insert_cv, remove_cv; std::queue<T> items; }; int main() { ProducerConsumer<int, 10> q; thread p{[&](){ for(int i = 1; i < 30; i++){ q.insert(i); } }}; thread c{[&](){ for(int i = 1; i < 30; i++){ auto item = q.remove(); } }}; p.join(); c.join(); return 0; } -------------------------------------------------------------------- Email: i (at) mistivia (dot) com