使用beanstalk实现简单的任务调度系统

上一篇文章介绍了beanstalk的特点,并用go语言写了个简单的demo。但是在实际开发中这么写代码肯定是不行的,因为要考虑扩展性,比如之后beanstalk增加tube以及不同的任务处理函数。我们下面就实现一种非常简单的任务调度系统框架。其实不光是beanstalk,几乎所有的消息队列都需要考虑这些问题。

说明:文章中用的是beanstalk的golang api:github.com/kr/beanstalk。参考后文。

框架

一般消息队列处理都是采用生产者/消费者模型,生产者向队列中放数据,消费者从队列中取数据并处理。

生产者

beanstalk中的任务基于tube来区分,同一个tube下的任务可以通过同一个消费者来消费。生产者很好写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Produce(addr, tubeName string, job []byte, delay time.Duration) (uint64, error) {
if tubeName == "" {
return 0, errors.New("tube name is empty")
}

c, err := beanstalk.Dial("tcp", addr)
if err != nil {
return 0, err
}

c.Tube.Name = tubeName
c.TubeSet.Name[tubeName] = true

//c.Put将job放入到队列中,对应的参数分别为任务,优先级,延时时间,处理任务时间
//返回job id
id, err = c.Put(job, 0, delay, 10*time.Second)
if err != nil {
//error handle
}
return id, err
}

消费者

针对不同tube下的任务,消费者可能是不一样的,理论上针对每个tube都需要有个consume?理论上是这样,但是如果这么写代码中会有很多重复的地方,比如连接beanstalk,异常处理。我们把这些共同的部分提取出来作为consume框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func Consume(addr, tubeName string) {
if fname == ""{
return
}

c, err := beanstalk.Dial("tcp", addr)
if err != nil {
return
}
defer c.Close()

c.Tube.Name = tubeName
c.TubeSet.Name[tubeName] = true

for {
//c.Reserve()从队列中取得任务,但是为了防止某个consume长时间占用,设置了timeout
id, body, err := c.Reserve(1 * time.Second)
//根据job不同执行不同的处理函数
}

注册函数

消费函数可以根据tube来区分,不同的tube对应不同的处理函数,我们可以把tube和处理函数的对应关系存储下来。

1
2
3
4
5
6
var handleFunc = make(map[string]func(job []byte) error)

//register
func RegisterHandler(tubeName string, handler func([]byte) error) {
handleFunc[tubeName] = handler
}

汇总

把上面代码汇总一下就得到下面的consume结构。

1
2
3
4
5
6
7
8
9
10
11
12
func Consume(addr, tubeName string) {
...

for {
//c.Reserve()从队列中取得任务,但是为了防止某个consume长时间占用,设置了timeout
id, body, err := c.Reserve(1 * time.Second)
if func, ok := handleFunc[tubeName]; !ok {
//error handle
}

func(body) //处理
}

任务

作为一个任务调度系统,至少要满足下面三点:

  1. 定时任务
  2. 延时任务
  3. 重复任务

当然延时任务也可以归为定时任务。延时/定时任务可以用beanstalk很容易实现,重复任务其实也可以。job状态流转图如下。

1
2
定时/延时 `DELAY`-->`READY`-->`RESERVE`-->`DELETE`
重复任务 `DELAY`-->`READY`-->`RESERVE`-->`DELAY` 重复N池

下面扩充consume代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Consume(fname, handleMethod, addr, tubeName string) {
...
for {
id, body, err := c.Reserve(1 * time.Second)
if err != nil {
//error handle
continue
}
// 处理已注册的消费者
go func() {
if func, ok := handleFunc[tubeName]; !ok {
//error handle
}

err = func(body) //处理
if err == nil {
if DELAY_JOB {
err = c.Delete(id)
} else if REPEATE_JOB {
err = c.Release(id)
}
}
}()

参考

beanstalk go api

1
2
3
4
5
6
7
8
9
10
// Put puts a job into tube t with priority pri and TTR ttr, and returns the id of the newly-created job
func (t *Tube) Put(body []byte, pri uint32, delay, ttr time.Duration) (id uint64, err error)
// Delete deletes the given job.
func (c *Conn) Delete(id uint64) error {
// Release tells the server to perform the following actions:
// set the priority of the given job to pri, remove it from the list of
// jobs reserved by c, wait delay seconds, then place the job in the
// ready queue, which makes it available for reservation by any client.
func (c *Conn) Release(id uint64, pri uint32, delay time.Duration) error
func (c *Conn) Bury(id uint64, pri uint32) error