我们的系统后台处理模仿memcached,采用libevent的事件通知实现数据的发送,采用linux的条件变量实现业务接收处理。因其存在可替换性,故对这两种方式的效率进行了部分测试。下面粘贴部分源码:
预测: 因条件变量机制是linux底层做的,而libevent的事件通知则是通过线程接收与事件关联,再通过管道实现数据传递。故应该是条件变量更快一点。
1,测试代码,发送数据并写管道通知或者写条件变量通知:
- static void* test1(void* args)
- {
- int i = 1;
- char buff[1] = {'m'};
- for (; i < 100000001; ++i)
- {
- pthread_mutex_lock(&mutexQueue);
- q1.push(i);
- pthread_mutex_unlock(&mutexQueue);
- //如采用写管道,就不采用broadcast,如采用broadcast,就不采用写管道
- if (write(threads->notify_send_fd, buff, 1) != 1)
- {
- LOG_ERROR("Write Pipe Failed!");
- }
- //pthread_cond_broadcast(&gCondReadInner);
- }
- }
2,libevent事件与管道创建
- {
- threads = static_cast<LIBEVENT_THREAD*>(calloc(1, sizeof(LIBEVENT_THREAD)));
- threads->base = event_base_new();
- int fds[2] = {-1, -1};
- if (pipe(fds))
- {
- LOG_ERROR("Create Pipe Failed!");
- return FAILED;
- }
- threads->notify_receive_fd = fds[0];
- threads->notify_send_fd = fds[1];
- //创建事件
- threads->notify_event = event_new(threads->base, threads->notify_receive_fd,EV_READ | EV_PERSIST, pipefunc, threads);
- if (NULL == threads->notify_event)
- {
- LOG_ERROR("Create Notify Event Failed!");
- return FAILED;
- }
- //添加事件
- if (-1 == event_add((event*)threads->notify_event, 0))
- {
- LOG_ERROR("Add Notify Event Failed!");
- return FAILED;
- }
- //准备结束,启动循环线程
- int iRet = 0;
- if ((iRet = pthread_create(&threads->thread_id, NULL, CLinkMgr::WorkerLibevent, (void*)threads)) != 0)
- {
- LOG_ERROR("Create Thread Failed! -- %d", iRet);
- return FAILED;
- }
- }
3,具体的管道接收线程实现
- <pre name="code" class="cpp">static void pipefunc(int fd, short which, void* arg)
- {
- char buff[1] = {0};
- if (read(fd, buff, 1) != 1)
- {
- LOG_ERROR("Read Text Failed!");
- return;
- }
- static int count = 0;
- int x;
- pthread_mutex_lock(&mutexQueue);
- x = q1.front();
- q1.pop();
- pthread_mutex_unlock(&mutexQueue);
- count++;
- if (count == 1)
- {
- LOG_ERROR("++++++++++++++++++++++ Count Start");
- }
- else if (count >= 100000000)
- {
- LOG_ERROR("++++++++++++++++++++++ Count End!, count = %d", count);
- }
- }
4,具体的条件变量读数据实现
- static void* ReadWait(void* args)
- {
- //设置线程对cancel状态的反应,允许退出线程
- (void)pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
- //设置线程对cancel的执行时机,立即执行
- (void)pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
- static int count = 0;
- while (1)
- {
- pthread_mutex_lock(&mutexQueue);
- while (q1.empty())
- {
- pthread_cond_wait(&gCondReadInner, &mutexQueue);
- }
- int x = q1.front();
- q1.pop();
- pthread_mutex_unlock(&mutexQueue);
- if (x == 1)
- {
- LOG_ERROR("++++++++++++++++++++++ Count Start");
- }
- else if (x >= 100000000)
- {
- LOG_ERROR("++++++++++++++++++++++ Count End!, count = %d", x);
- }
- }
- }
- //测试线程
- pthread_t tid;
- (void)pthread_create(&tid, NULL, test1, NULL);
- /*
- //创建wait线程读
- pthread_t tid1;
- (void)pthread_create(&tid1, NULL, ReadWait, NULL);
- //测试线程
- pthread_t tid;
- (void)pthread_create(&tid, NULL, test1, NULL);
- */
- //循环处理
- (void)event_base_dispatch(base);
测试结果
数据量 | libevent管道事件用时 | 条件变量通讯用时 |
---|---|---|
1000万 | 7-8秒 | 2-3秒 |
1亿 | 87秒 | 27秒 |
综上,条件变量的线程间通信效率是libevent写管道事件通知的3倍,符合我们的预测条件!!!。
也许这就是为什么memcached仅采用libevent写管道进行接收,而发送调用系统的发送函数的原因吧。
另外,通过该测试,能够看出来,这两种都是每秒百万级别的处理速度,对于当前系统的并发量还不存在瓶颈问题。联系客服