打开APP
userphoto
未登录

开通VIP,畅享免费电子书等14项超值服

开通VIP
Ubuntu14.04+Beanstalkd1.9最佳实践

目录

[TOC]

1、基本概念

1.1、什么是Beanstalkd?

  Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列。
  Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。它的内部采用libevent,服务器-客户端之间采用类似Memcached的轻量级通讯协议,因此性能很高(enque: 9000 jobs/second, worker: 5200 jobs/second)
  尽管是内存队列, Beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。Beanstalkd支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过Memcached的用户会觉得Beanstalkd似曾相识。

  Beanstalkd支持的语言有很多,可以参考这里:https://github.com/kr/beanstalkd/wiki/client-libraries

1.2、Beanstalkd设计的核心概念?

  • job

  一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

  • tube

  一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

  • producer

  Job的生产者,通过put命令来将一个job放到一个tube中。

  • consumer

  Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

1.3、Beanstalkd中Job的生命周期介绍

  Beanstalkd的流程如下图:

  

  • 当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。
  • consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。
  • 当consumer完成该job后,可以选择delete, release或者bury操作;
  • delete之后,job从系统消亡,之后不能再获取
  • release操作可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job
  • bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete buride状态的job

  正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

1.4、Beanstalkd有什么不足?

  Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。 和 Memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。

  

  一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。

  Beanstalk速度非常快,协议简单,占用内存空间少,而且支持持久化。唯一的不足是挂了之后恢复慢,3G日志数据恢复了十多分钟。

2、Beanstalkd的官网在哪里?

  http://kr.github.io/beanstalkd/

3、Beanstalkd在哪里下载?

  http://kr.github.io/beanstalkd/download.html

4、如何安装Beanstalkd?

  使用下面的命令进行安装,同时查看版本:

lion@node1:~$ sudo apt-get install beanstalkdlion@node1:~$ beanstalkd -vbeanstalkd 1.9

  Beanstalkd可以使用以下命令停止和启动:

lion@node1:~$ sudo service beanstalkd stop  #停止lion@node1:~$ sudo service beanstalkd start #启动

  通过apt-get安装后的配置文件目录在/etc/default/beanstalkd,里面描述了Beanstalkd监听的地址和端口

lion@node1:~$ cat /etc/default/beanstalkd## Defaults for the beanstalkd init script, /etc/init.d/beanstalkd on## Debian systems.BEANSTALKD_LISTEN_ADDR=127.0.0.1BEANSTALKD_LISTEN_PORT=11300# You can use BEANSTALKD_EXTRA to pass additional options. See beanstalkd(1)# for a list of the available options. Uncomment the following line for# persistent job storage.#BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

4.1、安装beanstool管理工具

  beanstool是一个Beanstalkd管理工具,不需要任何依赖就可以使用,是用Golang编写的。

lion@node1:~/_app$ wget https://github.com/src-d/beanstool/releases/download/v0.2.0/beanstool_v0.2.0_linux_amd64.tar.gzlion@node1:~/_app$ tar -xvzf beanstool_v0.2.0_linux_amd64.tar.gzlion@node1:~/_app$ sudo cp beanstool_v0.2.0_linux_amd64/beanstool /usr/local/bin/

5、Beanstalkd调用案例

  我们是使用Golang语言来调用Beanstalkd的,在官网上可以看到有现成的Golang包Beanstalk,使用下面的命令先安装Beanstalk

lion@node1:~$ go get github.com/kr/beanstalk

5.1、使用默认的Tube发送、接收消息

  producer_default.go(Produce jobs):

package mainimport (  "fmt"  "github.com/kr/beanstalk"  "time"  "log")const (  //uri  uri           =  "127.0.0.1:11300"  //network  network =  "tcp"  //  content = "hello idoall.org")//如果存在错误,则输出func failOnError(err error, msg string) {  if err != nil {    log.Fatalf("%s: %s", msg, err)    panic(fmt.Sprintf("%s: %s", msg, err))  }}func main(){  conn, err := beanstalk.Dial(network, uri)  failOnError(err, "Failed to connect to beanstalkd")  defer conn.Close()  //把job放入tube中并设置优先级  //  id, err := conn.Put(    []byte(content), //内容    1,    //优先级  0~2^32 个优先级, 0 代表最高优先级,默认优先级为1024。     0,    //要延迟发送的时间,0表示不延迟     time.Minute)  failOnError(err, "Failed puts a job into tube")  fmt.Println("job", id)}

  consumer_default.go:

package mainimport (  "fmt"  "github.com/kr/beanstalk"  "time"  "log")const (  //uri  uri           =  "127.0.0.1:11300"  //network  network =  "tcp")//如果存在错误,则输出func failOnError(err error, msg string) {  if err != nil {    log.Fatalf("%s: %s", msg, err)    panic(fmt.Sprintf("%s: %s", msg, err))  }}func main(){  conn, err := beanstalk.Dial(network, uri)  failOnError(err, "Failed to connect to beanstalkd")  defer conn.Close()  //  id, body, err := conn.Reserve(1 * time.Second)  failOnError(err, "returns a job from one of the tubes failed")  fmt.Println("job", id)  fmt.Println(string(body))  err = conn.Delete(id)  failOnError(err, "delete jobs failed")}




  查看结果


  Console1(Procuder):

lion@node1:~/_code/_beanstalk/_golang$ go run producer.gojob 1

  使用beanstool工具查看队列状态:

lion@node1:~/_app$ beanstool stats+---------+----------+----------+----------+----------+----------+----------+----------+| Name    | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |+---------+----------+----------+----------+----------+----------+----------+----------+| default | 0        | 0        | 1        | 0        | 1        | 0        | 1        |+---------+----------+----------+----------+----------+----------+----------+----------+

  Console2(Consumer):

lion@node1:~/_code/_beanstalk/_golang$ go run consumer_default.gojob 1hello idoall.org

5.2、使用指定的Tube发送、接收消息

  producer_tube_idoall.go(Produce jobs):

package mainimport (  "fmt"  "github.com/kr/beanstalk"  "time"  "log")const (  //uri  uri           =  "127.0.0.1:11300"  //network  network =  "tcp"  //  tubeName = "test-idoall-tube"  //  content = "hello idoall.org from idoall tube")//如果存在错误,则输出func failOnError(err error, msg string) {  if err != nil {    log.Fatalf("%s: %s", msg, err)    panic(fmt.Sprintf("%s: %s", msg, err))  }}func main(){  conn, err := beanstalk.Dial(network, uri)  failOnError(err, "Failed to connect to beanstalkd")  defer conn.Close()  tube := &beanstalk.Tube{conn, tubeName}  //把job放入tube中并设置优先级  //  id, err := tube.Put(    []byte(content), //内容    1,    //优先级  0~2^32 个优先级, 0 代表最高优先级,默认优先级为1024。     0,    //要延迟发送的时间,0表示不延迟     time.Minute)  failOnError(err, "Failed puts a job into tube")  fmt.Println("job", id)  conn.Close()}

  consumer_tube_idoall.go:

package mainimport (  "fmt"  "github.com/kr/beanstalk"  "time"  "log")const (  //uri  uri           =  "127.0.0.1:11300"  //network  network =  "tcp"  //  tubeName = "test-idoall-tube")//如果存在错误,则输出func failOnError(err error, msg string) {  if err != nil {    log.Fatalf("%s: %s", msg, err)    panic(fmt.Sprintf("%s: %s", msg, err))  }}func main(){  conn, err := beanstalk.Dial(network, uri)  failOnError(err, "Failed to connect to beanstalkd")  defer conn.Close()  tubeSet := beanstalk.NewTubeSet(conn, tubeName)  //  id, body, err := tubeSet.Reserve(10 * time.Hour)  failOnError(err, "returns a job from one of the tubes failed")  fmt.Println("job", id)  fmt.Println(string(body))  err = conn.Delete(id)  failOnError(err, "delete jobs failed")}




  查看结果


  Console1(Procuder):

lion@node1:~/_code/_beanstalk/_golang$ go run producer_tube_idoall.gojob 5

  使用beanstool工具查看队列状态:

lion@node1:~/_app$ beanstool stats+------------------+----------+----------+----------+----------+----------+----------+----------+| Name             | Buried   | Delayed  | Ready    | Reserved | Urgent   | Waiting  | Total    |+------------------+----------+----------+----------+----------+----------+----------+----------+| default          | 0        | 0        | 0        | 0        | 0        | 0        | 4        || test-idoall-tube | 0        | 0        | 1        | 0        | 1        | 0        | 1        |+------------------+----------+----------+----------+----------+----------+----------+----------+

  Console2(Consumer):

lion@node1:~/_code/_beanstalk/_golang$ go run consumer_tube_idoall.gojob 5hello idoall.org from idoall tube

5.3、Beanstalkd的消息持久化

  持久化消息非常简单,在启动的时候使用-b参数,指定目录:

lion@node1:~$ /usr/bin/beanstalkd -l 127.0.0.1 -p 11300 -b /home/lion/tmp

  可以参考文章使用Supervisor3.2.1基于Mac10.10.3对系统进程进行管理将Beanstalkd的命令做为系统进程,在开机的时候随系统启动。

6、参考资料

  官方示例





博文作者:迦壹
博客地址:Ubuntu14.04+Beanstalkd1.9最佳实践
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!



本站仅提供存储服务,所有内容均由用户发布,如发现有害或侵权内容,请点击举报
打开APP,阅读全文并永久保存 查看更多类似文章
猜你喜欢
类似文章
【热】打开小程序,算一算2024你的财运
Beanstalkd消息队列 -- php类Pheanstalk使用
Python使用 Beanstalkd 做异步任务处理的方法
rabbitmq消息队列——发布订阅
docker快速安装rabbitmq
Go语言学习笔记(八)golang 操作 Redis & Mysql & RabbitMQ
微服务实践之分布式定时任务
更多类似文章 >>
生活服务
热点新闻
分享 收藏 导长图 关注 下载文章
绑定账号成功
后续可登录账号畅享VIP特权!
如果VIP功能使用有故障,
可点击这里联系客服!

联系客服