beanstalkd 源码剖析:数据结构

引言

简单地说:beanstalk 和常规的消息队列比起来最大的特点是支持延时任务处理。类似 Kafka 中的 topic,beanstalk 中叫做 tube。消息的分发模式可以认为是有且仅有一次。关于 beanstalkd 的更多细节可以参考我 之前的文章《beanstalk初探》

数据结构

源码中关于各种数据结构的定义都在文件 dat.h 中,有 tube, job, Conn 的封装等。

Tube

tube 的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct tube {
uint refs;
char name[MAX_TUBE_NAME_LEN];
Heap ready;
Heap delay;
struct ms waiting; /* set of conns */
struct stats stat;
uint using_ct;
uint watching_ct;
int64 pause;
int64 deadline_at;
struct job buried;
};

struct ms {
size_t used, cap, last;
void **items;
ms_event_fn oninsert, onremove;
};

typedef void(*ms_event_fn)(ms a, void *item, size_t i);

其中各个字段的意义如下:

  • refs:类似 C++ 智能指针的引用计数,用于 GC。
  • name: tube 的名字,MAX_TUBE_NAME_LEN 为 201,也就是说 tube name 最长为200个字符。
  • ready:ready 的 job,以最小堆来存放。
  • delay:delay 的 job,存放类似 ready。
  • waiting:消费 tube 的 conn 列表,ms.items 是单链表。oninsert 和 onremove 是操作 items 的方法,除了把 conn 加入 链表,还做了引用计数。代码如下。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    static void
    on_watch(ms a, tube t, size_t i)
    {

    tube_iref(t);
    t->watching_ct++;
    }

    static void
    on_ignore(ms a, tube t, size_t i)
    {

    t->watching_ct--;
    tube_dref(t);
    }
    ...
    Conn *
    make_conn(int fd, char start_state, tube use, tube watch)
    {

    ...
    ms_init(&c->watch, (ms_event_fn) on_watch, (ms_event_fn) on_ignore);
    ...
    }
  • stat:job 的各个状态的个数统计。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    struct stats {
    uint urgent_ct;
    uint waiting_ct;
    uint buried_ct;
    uint reserved_ct;
    uint pause_ct;
    uint64 total_delete_ct;
    uint64 total_jobs_ct;
    };
  • using_ct:表示 tube 被多少个 conn 监听,感觉有点像冗余信息,源码里面并没有真正使用。

  • watching_ct:类似 using_ct,在 conn 加入 waiting 链表的时候 +1,删除的时候 -1。
  • pause:标志,在命令 “pause-tube” 的时候设置。
  • deadline_at:tube 中的最近的 job 过期时间。
  • buries:tube 中处于 bury 状态的 job 双向链表的表头。

Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct job {
Jobrec r; // persistent fields; these get written to the wal

/* bookeeping fields; these are in-memory only */
char pad[6];
tube tube;
job prev, next; /* linked list of jobs */
job ht_next; /* Next job in a hash table list */
size_t heap_index; /* where is this job in its current heap */
File *file;
job fnext;
job fprev;
void *reserver;
int walresv;
int walused;

char body[]; // written separately to the wal
};

struct Jobrec {
uint64 id;
uint32 pri;
int64 delay;
int64 ttr;
int32 body_size;
int64 created_at;
int64 deadline_at;
uint32 reserve_ct;
uint32 timeout_ct;
uint32 release_ct;
uint32 bury_ct;
uint32 kick_ct;
byte state;
};

Job 结构里面包含了 Jobrec 结果。Jobrec 各个字段含义如下:

  • id:id。
  • pri:优先级。
  • delay:delay 时间。
  • ttr:job 处理的超时时间。
  • body_size:job 的 body 大小,body 信息存储在 job 的 body 字段。
  • 其他字段主要是一些和 job 状态(Ready,delay, reserve, release)相关的字段。
  • state:job 状态(Ready,Reserved,Buried,Delayed)。

关于 job 有几点可以说的:

  • 全局变量数组 all_jobs 存储所有的 job,经过 hash 存储。
  • tube 中的 buried job 和 Conn 中的 reservered_jobs 是双向链表形式。
  • tube 的 delay 和 ready job 队列使用堆来存储。

也就是说 Job 有两种存储形式,各为不同的需求吧。job 的各个字段含义

  • pad:占位。
  • tube:job 所属的 tube。
  • prev, next:双向链表中的 prev 和 next job。
  • ht_next:hash 存储时处理碰撞的方式是开链法,ht_next 表示开链时的下一个 job。
  • heap_index:堆索引(堆使用数组来实现)。
  • file,fnext,fprev:文件相关。
  • reserver:消费这个 job 的 Conn,如果 job 没有被消费,reserver = NULL
  • walresv,walused:文件相关。
  • body:job 的具体信息。

Conn

除了 job 和 tube 外,beanstalkd 还对 conn 进行了封装,定义如下,不打算细说了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct Conn {
Server *srv;
Socket sock;
char state;
char type;
Conn *next;
tube use;
int64 tickat; // time at which to do more work
int tickpos; // position in srv->conns
job soonest_job; // memoization of the soonest job
int rw; // currently want: 'r', 'w', or 'h'
int pending_timeout;
char halfclosed;

char cmd[LINE_BUF_SIZE]; // this string is NOT NUL-terminated
int cmd_len;
int cmd_read;

char *reply;
int reply_len;
int reply_sent;
char reply_buf[LINE_BUF_SIZE]; // this string IS NUL-terminated

// How many bytes of in_job->body have been read so far. If in_job is NULL
// while in_job_read is nonzero, we are in bit bucket mode and
// in_job_read's meaning is inverted -- then it counts the bytes that
// remain to be thrown away.
int in_job_read;
job in_job; // a job to be read from the client

job out_job;
int out_job_sent;

struct ms watch;
struct job reserved_jobs; // linked list header
};