打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
一种同步消息队列模型 -- C++
    在已故巨匠Stevens先生中的<<网络编程卷2>>中有记载使用互斥锁和条件变量来解决生产者/消费者的方法,在多线程编程中,我们也常常需要解决这样的生产者消费者问题。在实际项目中,我见到过很多种解决类似问题的同步消息队列,有的复杂而优雅,有得简陋而实用。
    对于生产者,如果不考虑内存和队列的大小问题,只需要一直往消息队列里推消息就可以了。而对于消费者就要复杂一点了,在消息队列取空后,消费者可以循环轮询队列,直到取到新的消息,这种方式编码简单,但是浪费CPU时间片; 更加优雅的方法是当消息队列为空时,将消费者挂起,等到有消息可读时再唤醒。
    这里引用Stevens先生的方法,使用互斥锁和信号量实现了一个简单的同步消息队列模型。

#include <deque>
#include <sys/types.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>

using namespace std;

template<class DataType>
class Message_Queue
{
public:
    
//构造函数,初始化2个互斥锁和1个条件变量

     Message_Queue():_nready(0)
    {
     pthread_mutex_init(&_mutex, NULL);
     pthread_mutex_init(&_ready_mutex, NULL);
     pthread_cond_init(&_cond, NULL);
    }

    
//推消息

    int push_msg(DataType &d)
    {
        
//加锁_mutex,向queue里推消息

        pthread_mutex_lock(&_mutex); 
         _queue.push_back(d);
        pthread_mutex_unlock(&_mutex);

        
//加锁_ready_mutex,判断是否需要唤醒挂起的消费者

        pthread_mutex_lock(&_ready_mutex);
        if (!_nready)
            pthread_cond_signal(&_cond); 
//唤醒阻塞在此消息队列上的消费者

         _nready++; 
//计数器++

        pthread_mutex_unlock(&_ready_mutex);
        
        return 0;        
    }
    
//取消息

    int get_msg(DataType &d)
    {
        
//加锁,查看计数器,看是否需要挂起

        pthread_mutex_lock(&_ready_mutex);
        while (_nready == 0) //计数器为0,队列为空,挂起
              pthread_cond_wait(&_cond, &_ready_mutex); 
//为空时,所有消费者会阻塞在此

        
//当被生产者唤醒时,消费者重新加_ready_mutex锁,_nready > 0, 程序跳出while(_nready)循环继续运行

        _nready--;
        pthread_mutex_unlock(&_ready_mutex);
        
        
//加锁,取队列操作

        pthread_mutex_lock(&_mutex); 
         d = _queue.front();
         _queue.pop_front();
        pthread_mutex_unlock(&_mutex);

        return 0;
    }

private:
    pthread_cond_t     _cond;
    int                 _nready;
    pthread_mutex_t     _ready_mutex;
    pthread_mutex_t     _mutex;
    deque<DataType> _queue;
};


//测试程序,模拟N个生产者和M个消费者使用消息队列同时工作的情况。

void *consume(void *arg) 
//消费者线程

{
     Message_Queue<int> *queue = (Message_Queue<int> *)arg;
    int     i;
    for (; ; )
    {
        sleep(1);
        printf("[%u]ready to get_msg\n", pthread_self());
        queue->get_msg(i);
        printf("[%u]get msg = %d\n", pthread_self(), i);
    }

}
void *produce(void *arg) 
//生产者线程

{
     Message_Queue<int> *queue = (Message_Queue<int> *)arg;
    int     i;
    for (; ; )
    {
        sleep(1);
        printf("[%u]ready to push_msg\n", pthread_self());
        queue->push_msg(i);
        printf("[%u]push_msg finished\n", pthread_self());
         i++;
    }
}

int main()
{

     Message_Queue<int> msg_queue;

    int     n ;
    int c = 2;
    int p = 3;
    pthread_t     k;
    printf("create %d consume................\n", c);
    for (n=0; n< c; n++)
    {
        pthread_create(&k, NULL, consume, &msg_queue);
    }
    printf("create %d produce.............\n", p);
    for (n=0; n< p; n++)
    {
        pthread_create(&k, NULL, produce, &msg_queue);
    }

     pause();
}

    这样,我们对STL的deque 进行一个简单的封装,就获得了一个可用的同步消息队列。这里需要提及的是,一次pthread_cond_signal操作会唤醒所有阻塞在此条件变量上的消费者,而所有阻塞在pthread_cond_wait上的消费者被唤醒,这就是所谓“惊群”。而被唤醒的所有消费者都会去竞争_ready_mutex锁,这就是所谓“二次竞争”。这样的“惊群”和“二次竞争”在各个平台上是否存在,对性能的影响有多大,我没有验证过,只是一种猜测,不过相信这样的模型满足一般的应用程序应该没有问题。

本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
POSIX 线程详解
linux多线程下无界缓冲区的生产者消费者问题
linux下C实现多线程
Mongoose源码剖析:mongoose的工作模型
一个Linux下C线程池的实现
剖析Java中阻塞队列的实现原理及应用场景
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服