引言
简单地说:beanstalk 和常规的消息队列比起来最大的特点是支持延时任务处理。类似 Kafka 中的 topic,beanstalk 中叫做 tube。消息的分发模式可以认为是有且仅有一次。关于 beanstalkd 的更多细节可以参考我 之前的文章《beanstalk初探》 。
数据结构
源码中关于各种数据结构的定义都在文件 dat.h 中,有 tube, job, Conn 的封装等。
Tube
tube 的定义如下
1 | struct tube { |
其中各个字段的意义如下:
refs:类似 C++ 智能指针的引用计数,用于 GC。
name: tube 的名字,MAX_TUBE_NAME_LEN 为 201,也就是说 tube name 最长为200个字符。
ready:ready 的 job,以最小堆来存放。
delay:delay 的 job,存放类似 ready。
waiting:消费 tube 的 conn 列表,ms.items 是单链表。oninsert 和 onremove 是操作 items 的方法,除了把 conn 加入 链表,还做了引用计数。代码如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21static void
on_watch(ms a, tube t, size_t i)
{
tube_iref(t);
t->watching_ct++;
}
static void
on_ignore(ms a, tube t, size_t i)
{
t->watching_ct--;
tube_dref(t);
}
...
Conn *
make_conn(int fd, char start_state, tube use, tube watch)
{
...
ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
...
}stat:job 的各个状态的个数统计。
1
2
3
4
5
6
7
8
9struct stats {
uint urgent_ct;
uint waiting_ct;
uint buried_ct;
uint reserved_ct;
uint pause_ct;
uint64 total_delete_ct;
uint64 total_jobs_ct;
};using_ct:表示 tube 被多少个 conn 监听,感觉有点像冗余信息,源码里面并没有真正使用。
watching_ct:类似 using_ct,在 conn 加入 waiting 链表的时候 +1,删除的时候 -1。
pause:标志,在命令 “pause-tube” 的时候设置。
deadline_at:tube 中的最近的 job 过期时间。
buries:tube 中处于 bury 状态的 job 双向链表的表头。
Job
1 | struct job { |
Job 结构里面包含了 Jobrec 结果。Jobrec 各个字段含义如下:
- id:id。
- pri:优先级。
- delay:delay 时间。
- ttr:job 处理的超时时间。
- body_size:job 的 body 大小,body 信息存储在 job 的 body 字段。
- 其他字段主要是一些和 job 状态(Ready,delay, reserve, release)相关的字段。
- state:job 状态(Ready,Reserved,Buried,Delayed)。
关于 job 有几点可以说的:
- 全局变量数组 all_jobs 存储所有的 job,经过 hash 存储。
- tube 中的 buried job 和 Conn 中的 reservered_jobs 是双向链表形式。
- tube 的 delay 和 ready job 队列使用堆来存储。
也就是说 Job 有两种存储形式,各为不同的需求吧。job 的各个字段含义
- pad:占位。
- tube:job 所属的 tube。
- prev, next:双向链表中的 prev 和 next job。
- ht_next:hash 存储时处理碰撞的方式是开链法,ht_next 表示开链时的下一个 job。
- heap_index:堆索引(堆使用数组来实现)。
- file,fnext,fprev:文件相关。
- reserver:消费这个 job 的 Conn,如果 job 没有被消费,reserver = NULL
- walresv,walused:文件相关。
- body:job 的具体信息。
Conn
除了 job 和 tube 外,beanstalkd 还对 conn 进行了封装,定义如下,不打算细说了。
1 | struct Conn { |