beanstalk初探

beanstalk是什么

Beanstalk是一个轻量级的消息队列,典型的类Memcache设计,协议和使用方式都是同样的风格。Beanstalk的应用场景主要有:

  1. 消息异步处理,这个和其他消息队列没有区别。
  2. 消息延迟处理,类似crontab,比crontab优点是支持并发。

beanstalkd的定位很简单:基于管道(tube)的任务(job)队列。典型的生产者-消费者模型。

beanstalkd架构

beanstalkd核心概念

  • job: 任务,队列中的基本单元
  • tube: beanstalkd中可以保护多个任务队列,不同队列之间通过tube来区分
  • producer: 生产者,创建job
  • consumer: 消费者,消费job
  • priority:任务优先级,内部使用堆来实现,所以每次取job的时间复杂度为O(logn)

job状态流转图

任务队列中的job有多个状态,只有处于READY状态的Job才能被消费。job的状态流转图如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*
typical job lifecycle:

put reserve delete
-----> [READY] ---------> [RESERVED] --------> *poof*

more possibilities:

put with delay release with delay
----------------> [DELAYED] <------------.
| |
| (time passes) |
| |
put v reserve | delete
-----------------> [READY] ---------> [RESERVED] --------> *poof*
^ ^ | |
| \ release | |
| `-------------' |
| |
| kick |
| |
| bury |
[BURIED] <---------------'
|
| delete
`--------> *poof*
*/

Producer创建job的时候可以选择两种方式:put, put with delay。对于第一种方式创建的job,状态直接就为READY,消费者可以直接消费。第二种put with delay方式创建的job处于状态DELAYED,等待delay时间之后job状态转变为READY。在消费者消费job的过程中,job状态转变为RESERVED,这样job又变成不可消费的了。这两种创建job的方式都会传入一个TTR(Timeout机制),当job处于RESERVED状态时,TTR开始倒计时。如果TTR倒计时完,job状态还没有变,就可以认为job处理失败,会被重新放回队列。
job被消费之后可以重新放回队列(RELEASE: put和put with delay两种方式),这样就可以实现job的循环,类似crontab。job被消费之后也可以直接删除(DELETE)。除了RELEASE和DELETE,job还可以进入BURY状态。所谓BURY状态,是一种脱离正常流程的状态(不会自动转变为READY,也就不会被消费),我们可以手动把job kick回READY队列,或者DELETE。为什么要有BURY状态,我理解这是一种异常捕获机制。比如对于前面处理超时的Job,我们就可以将其设置为BURY状态,这样有两个好处:1.避免主流程再次进入到异常;2.后期人工排查错误。

beanstalk分布式

beanstalk的分布式处理和memcached一样:由client来做,server之间不互相感知。memcached一种常用的方式是一致性hash算法。

beanstalkd使用

安装

Mac环境安装非常简单,使用brew就可以了。

1
brew install beanstalk

启动

1
beanstalkd  //默认启动在11300端口

Go Demo

使用golang实现一个简单的producer和consumer,需要用到包github.com/kr/beanstalk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//producer.go
import (
"strconv"
"github.com/kr/beanstalk"
)

func main() {
c, _ := beanstalk.Dial("tcp", "127.0.0.1:11300")
c.Tube.Name = "legendtkl"
c.TubeSet.Name["legendtkl"] = true

for i := 0; i < 10; i++ {
msg := strconv.Itoa(i)
id, err := c.Put([]byte(msg), 1, 60*time.Second, 10*time.Second)
if err != nil {
//error handle
}
}
}

//consumer.go
func main() {
c, _ := beanstalk.Dial("tcp", "127.0.0.1:11300")
c.Tube.Name = "legendtkl"
c.TubeSet.Name["legendtkl"] = true

for {
id, body, err := c.Reserve(5*time.Second)
if err != nil {
//error handle
}
fmt.Println(id)
fmt.Println(string(body))
}
}