beanstalk是什么
Beanstalk是一个轻量级的消息队列,典型的类Memcache设计,协议和使用方式都是同样的风格。Beanstalk的应用场景主要有:
- 消息异步处理,这个和其他消息队列没有区别。
- 消息延迟处理,类似crontab,比crontab优点是支持并发。
beanstalkd的定位很简单:基于管道(tube)的任务(job)队列。典型的生产者-消费者模型。
beanstalkd架构
beanstalkd核心概念
- job: 任务,队列中的基本单元
- tube: beanstalkd中可以保护多个任务队列,不同队列之间通过tube来区分
- producer: 生产者,创建job
- consumer: 消费者,消费job
- priority:任务优先级,内部使用堆来实现,所以每次取job的时间复杂度为O(logn)
job状态流转图
任务队列中的job有多个状态,只有处于READY
状态的Job才能被消费。job的状态流转图如下。
1 | /* |
Producer创建job的时候可以选择两种方式:put, put with delay。对于第一种方式创建的job,状态直接就为READY
,消费者可以直接消费。第二种put with delay方式创建的job处于状态DELAYED
,等待delay时间之后job状态转变为READY
。在消费者消费job的过程中,job状态转变为RESERVED
,这样job又变成不可消费的了。这两种创建job的方式都会传入一个TTR(Timeout机制),当job处于RESERVED
状态时,TTR开始倒计时。如果TTR倒计时完,job状态还没有变,就可以认为job处理失败,会被重新放回队列。
job被消费之后可以重新放回队列(RELEASE
: put和put with delay两种方式),这样就可以实现job的循环,类似crontab。job被消费之后也可以直接删除(DELETE)。除了RELEASE和DELETE,job还可以进入BURY
状态。所谓BURY状态,是一种脱离正常流程的状态(不会自动转变为READY,也就不会被消费),我们可以手动把job kick回READY
队列,或者DELETE
。为什么要有BURY
状态,我理解这是一种异常捕获机制。比如对于前面处理超时的Job,我们就可以将其设置为BURY
状态,这样有两个好处:1.避免主流程再次进入到异常;2.后期人工排查错误。
beanstalk分布式
beanstalk的分布式处理和memcached一样:由client来做,server之间不互相感知。memcached一种常用的方式是一致性hash算法。
beanstalkd使用
安装
Mac环境安装非常简单,使用brew就可以了。
1 | brew install beanstalk |
启动
1 | beanstalkd //默认启动在11300端口 |
Go Demo
使用golang实现一个简单的producer和consumer,需要用到包github.com/kr/beanstalk
。
1 | //producer.go |