在已故巨匠Stevens先生中的<<网络编程卷2>>中有记载使用互斥锁和条件变量来解决生产者/消费者的方法,在多线程编程中,我们也常常需要解决这样的生产者消费者问题。在实际项目中,我见到过很多种解决类似问题的同步消息队列,有的复杂而优雅,有得简陋而实用。
对于生产者,如果不考虑内存和队列的大小问题,只需要一直往消息队列里推消息就可以了。而对于消费者就要复杂一点了,在消息队列取空后,消费者可以循环轮询队列,直到取到新的消息,这种方式编码简单,但是浪费CPU时间片; 更加优雅的方法是当消息队列为空时,将消费者挂起,等到有消息可读时再唤醒。
这里引用Stevens先生的方法,使用互斥锁和信号量实现了一个简单的同步消息队列模型。
#include <deque> #include <sys/types.h> #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <stdio.h>
using namespace std;
template<class DataType> class Message_Queue { public: //构造函数,初始化2个互斥锁和1个条件变量
Message_Queue():_nready(0) { pthread_mutex_init(&_mutex, NULL); pthread_mutex_init(&_ready_mutex, NULL); pthread_cond_init(&_cond, NULL); }
//推消息
int push_msg(DataType &d) { //加锁_mutex,向queue里推消息
pthread_mutex_lock(&_mutex); _queue.push_back(d); pthread_mutex_unlock(&_mutex);
//加锁_ready_mutex,判断是否需要唤醒挂起的消费者
pthread_mutex_lock(&_ready_mutex); if (!_nready) pthread_cond_signal(&_cond); //唤醒阻塞在此消息队列上的消费者
_nready++; //计数器++
pthread_mutex_unlock(&_ready_mutex); return 0; } //取消息
int get_msg(DataType &d) { //加锁,查看计数器,看是否需要挂起
pthread_mutex_lock(&_ready_mutex); while (_nready == 0) //计数器为0,队列为空,挂起 pthread_cond_wait(&_cond, &_ready_mutex); //为空时,所有消费者会阻塞在此
//当被生产者唤醒时,消费者重新加_ready_mutex锁,_nready > 0, 程序跳出while(_nready)循环继续运行
_nready--; pthread_mutex_unlock(&_ready_mutex); //加锁,取队列操作
pthread_mutex_lock(&_mutex); d = _queue.front(); _queue.pop_front(); pthread_mutex_unlock(&_mutex);
return 0; }
private: pthread_cond_t _cond; int _nready; pthread_mutex_t _ready_mutex; pthread_mutex_t _mutex; deque<DataType> _queue; };
//测试程序,模拟N个生产者和M个消费者使用消息队列同时工作的情况。
void *consume(void *arg) //消费者线程
{ Message_Queue<int> *queue = (Message_Queue<int> *)arg; int i; for (; ; ) { sleep(1); printf("[%u]ready to get_msg\n", pthread_self()); queue->get_msg(i); printf("[%u]get msg = %d\n", pthread_self(), i); }
} void *produce(void *arg) //生产者线程
{ Message_Queue<int> *queue = (Message_Queue<int> *)arg; int i; for (; ; ) { sleep(1); printf("[%u]ready to push_msg\n", pthread_self()); queue->push_msg(i); printf("[%u]push_msg finished\n", pthread_self()); i++; } }
int main() {
Message_Queue<int> msg_queue;
int n ; int c = 2; int p = 3; pthread_t k; printf("create %d consume................\n", c); for (n=0; n< c; n++) { pthread_create(&k, NULL, consume, &msg_queue); } printf("create %d produce.............\n", p); for (n=0; n< p; n++) { pthread_create(&k, NULL, produce, &msg_queue); }
pause(); }
|
这样,我们对STL的deque 进行一个简单的封装,就获得了一个可用的同步消息队列。这里需要提及的是,一次pthread_cond_signal操作会唤醒所有阻塞在此条件变量上的消费者,而所有阻塞在pthread_cond_wait上的消费者被唤醒,这就是所谓“惊群”。而被唤醒的所有消费者都会去竞争_ready_mutex锁,这就是所谓“二次竞争”。这样的“惊群”和“二次竞争”在各个平台上是否存在,对性能的影响有多大,我没有验证过,只是一种猜测,不过相信这样的模型满足一般的应用程序应该没有问题。
本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请
点击举报。