一定一定要更具业务特点来划分出不同的队列,不能讲所有的任务都让同一队列消费。场景,工程当中有统计类的异步任务,也有发送红包给微信用户的异步任务。若不划分队列,那么当统计类的异步任务很多很多的时候,发送红包给微信用户的异步任务就要等待一段时间才能被执行到(任务多可能会等待十几分钟甚至一两个小时),这对用户体验来说是非常不友好的。此时就可以将队列划分为low, middle, high这几种队列,对于统计类的这些任务可以扔去low的队列中,而对于发送红包这种重要的任务就扔去high队列中,确保尽可能快的被执行到。具体划分,需要更加业务场景来划分。
通过-Q参数来指定队列名:
python celery worker -A test_project -Q low;python celery worker -A test_project -Q middle;python celery worker -A test_project -Q high;# django代理中设置CELERY_QUEUES = ( Queue('low', Exchange('low', type='direct')), Queue('middle', Exchange('middle', type='direct')), Queue('high', Exchange('high', type='direct')),)
对任务指定队列运行
方式1@app.task(bind=True, queue='middle', name='send_wx_text')def send_wx_text(self, target_origin_id, to_user, txt): ...... 方式2send_wx_text.apply_async((target_origin_id, to_user, txt), queue='middle')方式3CELERY_ROUTES = { 'send_wx_text': { 'queue': 'middle', },}
大部分异步任务都是不需要关心处理结果的,此时不需要将处理结果返回。由于返回处理结果,需要存储,这意味着有IO操作,会降低性能,若将结果存储在redis或者RabbitMQ 中,随着任务数的增加,消耗的内存也会增加(曾经因为这个原因爆过内存.....)。若非要存储结果,也要设置好结果的保存时间,以免内存泄漏。
设置例子
# 设置结果的保存时间CELERY_TASK_RESULT_EXPIRES = 10*60# 设置默认不存结果CELERY_IGNORE_RESULT = True# 任务单独指定, 会覆盖全局配置@app.task(bind=True, ignore_result=True)def send_wx_text(self, target_origin_id, to_user, txt): ......
对开启的进程数做过测试,发现当进程数等于cpu核心数时,性能是最好的,少了不能很好的利用多核,多了性能下降估计是因为开多的进程带来的性能的提升小于因上下文切换带来的性能损失。对于协程池的大小设置,需要进行测试来得出最佳数量。
例子:
[program:test_project-high-jobs]command=/opt/.virtualenvs/test_project/bin/celery worker -A test_project -P gevent -c 100 -l INFO -Q high -n %%h-test_project-high --without-gossip --without-mingle --without-heartbeatenvironment=PATH="/opt/.virtualenvs/test_project/bin"directory=/srv/test_project.com/applicationuser=uwsgipriority=10autorestart=trueprocess_name=TEST_PROJECT_%(process_num)s;进程数==核心数numprocs=4stdout_logfile=/var/log/supervisor/test_project-high-jobs.logstderr_logfile=/var/log/supervisor/test_project-high-jobs.log
利用gevent来部署worker, 若task使用了那些不支持gevent的库(例如requests的普通模式),会使到异步变同步,极其影响性能。这一点对于其它的协程库也是一样的例如eventlet。
重试需要注意的点
简单的例子
def backoff(attempts): """ 1, 2, 4, 8, 16, 32, ... """ return 2 ** attempts# 设置max_retries, 限定重试次数@app.task(bind=True, max_retries=3)def send_wx_text(self, target_origin_id, to_user, txt): try: r, err = wx_api.send_text(to_user, txt) if err and int(err.code) in (45047): # 更具重试次数增加重试的延时的时间 raise self.retry(args=[target_origin_id, to_user, txt], countdown=backoff(self.request.tretries)) except BaseException as e: raise e
对于一些爬虫类的异步任务,我们常常是使用代理去爬,一般来说代理服务商提供的代理能用的概率比较低,容易导致timeout,对于这些可能会失败的任务应该尽快让它失败而不是在等待它失败,避免做无畏的等待,进而提高性能。预估一下任务的正常处理时间,根据这个时间来规定一个任务的最大执行时间。
简单的例子
@app.task(bind=True, max_retries=3, soft_time_limit=5)def send_wx_text(self, target_origin_id, to_user, txt): .....
任务失败或者重试,一般来说是需要进行记录,若每一个任务都写一遍相关的代码,这非常麻烦。此时,通过执行任务的base参数对任务进行统一处理。
例子
from myproject.tasks import app# 具体的可以看看Task类中的方法class BaseTask(app.Task): abstract = True def on_retry(self, exc, task_id, args, kwargs, einfo): sentrycli.captureException(exc) super(BaseTask, self).on_retry(exc, task_id, args, kwargs, einfo) def on_failure(self, exc, task_id, args, kwargs, einfo): sentrycli.captureException(exc) super(BaseTask, self).on_failure(exc, task_id, args, kwargs, einfo) @app.task(bind=True, max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)def send_wx_text(self, target_origin_id, to_user, txt): .......
理由:
场景:工程A利用tornado接收微信转发过来的用户信息,但信息的处理是在使用Django的工程B中。也就是说,由工程A做消息的生产者,工程B作为消息的消费者。若工程A,B的broker都是一样的,那么只需要在工程A设置一个同名的异步任务并指定exchange, routing_key即可。若broker不一致, 则先指定broker,指定exchange, 指定routing_key。原理就是,amqp协议中,只要指定了exchange和routing_key,就可以将任务分发到绑定的队列当中了。celery库做了封装,会更具queue来指定exchange和routing_key。
相同broker的例子
#工程B中的代码@app.task(bind=True, name='send_wx_text',max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)def send_wx_text(self, target_origin_id, to_user, txt): # 含有具体实现 ......# 工程A中的代码from celery.app.task import Tasksend_wx_text = Task()send_wx_text.name='send_wx_text'send_wx_text.apply_async([target_origin_id, to_user, txt],exchange='test', routing_key='test')
不同broker的例子
#工程B中的代码@app.task(bind=True, name='send_wx_text',max_retries=3, soft_time_limit=5, ignore_result=True, base=BaseTask)def send_wx_text(self, target_origin_id, to_user, txt): # 含有具体实现 ......# 工程A中的代码from celery.app.task import Taskfrom celery import Celeryapp = Celery(borker='redis://127.0.0.1:6379/15')send_wx_text = Task()send_wx_text.bind(app)send_wx_text.name='send_wx_text'send_wx_text.apply_async([target_origin_id, to_user, txt], exchange='test', routing_key='test')
小礼物走一走,来简书关注我
联系客服