0%

Ganglia 源码剖析之 gmond

0. 引言

前面几篇文章从使用和特性方面介绍了 ganglia,我们这篇文章直接来看一下 ganglia 的核心 gmond 的源码。

1. 主流程

gmond 的主流程主要包括几个部分:

  • 命令行参数解析
  • 解析处理配置文件
  • 载入指标采集 module
  • 创建一个进程处理 tcp 请求
  • 循环处理指标采集、发送、接收

我们会在文章的之后几节依次进行介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int
main ( int argc, char *argv[] )
{
// 命令行参数解析
if (cmdline_parser (argc, argv, &args_info) != 0)
exit(EXIT_FAILURE);

/* Builds a default configuration based on platform */
build_default_gmond_configuration((Ganglia_pool)global_context);

// 处理配置文件
process_configuration_file();

// 载入 metric collect module
load_metric_modules();

// 建立 metric name 和 handler 之间的对应关系
setup_metric_callbacks();

// 创建 tcp server
/* Create TCP listener thread */
if(!deaf)
{
apr_thread_t *thread;
if (apr_thread_create(&thread, NULL, tcp_listener, NULL, global_context) != APR_SUCCESS)
{
err_msg("Failed to create TCP listener thread. Exiting.\n");
exit(EXIT_FAILURE);
}
}

// 循环处理指标的采集、发送、收集等。
for (;!done;) {
...
}
}

2. metric 采集

metric 采集要说的主要由两个部分:load module 和 metric collect。先来看一下 load module。

2.1 module load

要说 module 载入,需要先说一下配置文件结构。关于配置文件中的 module 配置具体说明可以参考文件 monitor-core/gmond/conf.pod 的说明。正常 modules 节点下至少有一个 modulemodule 的属性包括:

  • name: module 的名字,比如说 python module 的话对应的就是文件名
  • language: module 类型,可以是 C/C++, 也可以是 Python。如果没有,则默认为 C/C++。
  • enable: 开启标志。没有配置则默认为 yes。
  • path: module 的 path。载入的时候会根据 path 去查找。
  • params: metric_init 函数的参数,可以为空。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
modules {
module {
name = "example_module"
language = "C/C++"
enabled = yes
path = "modexample.so"
params = "An extra raw parameter"
param RandomMax {
value = 75
}
param ConstantValue {
value = 25
}
}
}

module 的具体载入过程在函数 load_metric_modules 中实现。所有的 module 会被组成一个链表,表头保存在全局变量 metric_modules 中。通过下面的载入过程我们可以知道,并不是 module 定义了就会载入的,而是根据 conf 来决定载入的 module 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
static void
load_metric_modules( void )
{
tmp = cfg_getsec( config_file, "modules");
for (j = 0; j < cfg_size(tmp, "module"); j++)
{
cfg_t *module = cfg_getnsec(tmp, "module", j);
...

modLanguage = cfg_getstr(module, "language");
...

modEnabled = cfg_getbool(module, "enabled");
...

modPath = cfg_getstr(module, "path");
...

modName = cfg_getstr(module, "name");
modparams = cfg_getstr(module, "params");
...

/*
* Load the file into the gmond address space
*/
if (apr_dso_load(&modHandle, modPath, global_context) != APR_SUCCESS)
{
char my_error[256];

err_msg("Cannot load %s metric module: %s", modPath,
apr_dso_error(modHandle, my_error, sizeof(my_error)));
if (!modPath)
err_msg("No load path specified for module: %s or incorrect module language designation [%s].\n",
modName, modLanguage);
continue;
}

modp = (mmodule*) modSym;
modp->dynamic_load_handle = (apr_dso_handle_t *)modHandle;
modp->module_name = apr_pstrdup (global_context, modName);
modp->module_params = apr_pstrdup (global_context, modparams);
modp->module_params_list = modParams_list;
modp->config_file = config_file;

...

if (metric_modules != NULL) {
modp->next = metric_modules;
}
metric_modules = modp;
}
}

2.2 metric init

module 载入之后,我们需要做的是建立一个 metric name 和 handler 之间的对应关系表。在扩展 gmod 的 metric 的时候需要定义函数 metric_init(),其实这就是 metric init。主流程中由函数 setup_metric_callbacks() 来做这件事。metrics_modules 就是 2.1 生成的链表,setup_metric_callbacks() 函数就是遍历 metrics_modules 将其保存的 metric name 和 handler 建立关联关系,保存在 hash 表 metric_cb 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
static void
setup_metric_callbacks( void )
{
mmodule *modp = metric_modules;
Ganglia_metric_callback *metric_cb;
/* Create the metric_callbacks hash */
metric_callbacks = apr_hash_make( global_context );

while (modp) {
const Ganglia_25metric* metric_info;
int i;

// metric init。modp->init() 是函数指针,指向 metric_init() 函数。
if (modp->init && modp->init(global_context)) {
err_msg("Module %s failed to initialize.\n", modp->module_name);
}
else
{
apr_pool_cleanup_register(global_context, modp,
modular_metric_cleanup,
apr_pool_cleanup_null);

metric_info = modp->metrics_info;
for (i = 0; metric_info[i].name != NULL; i++)
{
// modp->handler 就是 metric name 对应的 handler 函数,全部放到 hash 表 metric_cb 中
metric_cb = Ganglia_metric_cb_define(metric_info[i].name, modp->handler, i, modp);
if (metric_cb)
metric_cb->info = (Ganglia_25metric*)&(metric_info[i]);
}
}
modp = modp->next;
}
}

我们下面看一下 python module 的处理过程,在文件 mod_python.c 中。

1
2
3
4
5
6
7
8
mmodule python_module =
{
STD_MMODULE_STUFF,
pyth_metric_init, // 对应的就是上面的 modp->init 函数
NULL,
NULL, /* defined dynamically */
pyth_metric_handler, // 对应上面的 modp->handler 函数
};

下面看一下具体函数,函数里面会有 c 调用 python module 的部分代码,比如获得 GIL 全局锁之类的,下面不再细说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static int pyth_metric_init (apr_pool_t *p)
{
...
while ((entry = readdir(dp)) != NULL) {
// 判断是否是 python module
modname = is_python_module(entry->d_name);

if (modname == NULL)
continue;

// 查找配置文件
/* Find the specified module configuration in gmond.conf
If this return NULL then either the module config
doesn't exist or the module is disabled. */
module_cfg = find_module_config(modname);
if (!module_cfg)
continue;

PyEval_RestoreThread(gtstate);
// 载入 python module
pmod = PyImport_ImportModule(modname);
...
// 获得 module 中的 metric_init() 函数地址
pinitfunc = PyObject_GetAttrString(pmod, "metric_init");
...
// 构造函数参数
/* Build a parameter dictionary to pass to the module */
pparamdict = build_params_dict(module_cfg);

// 调用 metric_init(param) 函数,这个函数会返回一个 dict,包含了 metric name 和 handler 的对应关系
/* Now call the metric_init method of the python module */
pobj = PyObject_CallFunction(pinitfunc, "(N)", pparamdict);
...
// 处理 metric_init(param) 函数返回的 dict
}

2.3 metric collect

下面在看一下负责具体 metric 采集的 pyth_metric_handler 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static g_val_t pyth_metric_handler( int metric_index )
{
...
// 调用 metric_name 对应的 handler,handler 的参数就是 metric name
pobj = PyObject_CallFunction(mi[metric_index].pcb, "s", gmi[metric_index].name);
...
// 处理 handler 返回的值
switch (gmi[metric_index].type) {
case GANGLIA_VALUE_STRING:
{
get_python_string_value(pobj, val.str, sizeof(val.str));
break;
}
...
}
}

3. udp send

对于上面的第二节会收集到的 metric data,还需要发送出去(有单播和多播模式),对应的就是 gmond.conf 中的 udp_send_channel。gmond.conf 一般定义如下。

1
2
3
4
5
6
7
8
9
10
11
udp_send_channel {
#bind_hostname = yes # Highly recommended, soon to be default.
# This option tells gmond to use a source address
# that resolves to the machine's hostname. Without
# this, the metrics may appear to come from any
# interface and the DNS names associated with
# those IPs will be used to create the RRDs.
#mcast_join = 10.252.135.105
host = emr-header-1
port = 8649
}

单播模式对应的是 host,多播对应的 mcast_join。main 里面根据 gmond.conf 配置建立相关的 socket 对应的代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int
main ( int argc, char *argv[] )
{
...
// 建立 socket channel
udp_send_channels = Ganglia_udp_send_channels_create((Ganglia_pool)global_context,
(Ganglia_gmond_config)config_file);
}

Ganglia_udp_send_channels
Ganglia_udp_send_channels_create( Ganglia_pool p, Ganglia_gmond_config config )
{
apr_array_header_t *send_channels = NULL;
cfg_t *cfg=(cfg_t *)config;
int i, num_udp_send_channels = cfg_size( cfg, "udp_send_channel");
apr_pool_t *context = (apr_pool_t*)p;

/* Return null if there are no send channels specified */
if(num_udp_send_channels <= 0)
return (Ganglia_udp_send_channels)send_channels;

/* Create my UDP send array */
send_channels = apr_array_make( context, num_udp_send_channels,
sizeof(apr_socket_t *));
// 解析 gmond.conf 中的所有 udp_send_channel
for(i = 0; i< num_udp_send_channels; i++)
{
udp_send_channel = cfg_getnsec( cfg, "udp_send_channel", i);
host = cfg_getstr( udp_send_channel, "host" );
mcast_join = cfg_getstr( udp_send_channel, "mcast_join" );
mcast_if = cfg_getstr( udp_send_channel, "mcast_if" );
port = cfg_getint( udp_send_channel, "port");
ttl = cfg_getint( udp_send_channel, "ttl");
bind_address = cfg_getstr( udp_send_channel, "bind" );
bind_hostname = cfg_getbool( udp_send_channel, "bind_hostname");

if( mcast_join )
{
/* We'll be listening on a multicast channel */
socket = create_mcast_client(pool, mcast_join, port, ttl, mcast_if, bind_address, bind_hostname);
...
}
else
{
/* Create a UDP socket */
socket = create_udp_client( pool, host, port, mcast_if, bind_address, bind_hostname );
...
}

// socket 入队列
/* Add the socket to the array */
*(apr_socket_t **)apr_array_push(send_channels) = socket;
}
return (Ganglia_udp_send_channels)send_channels;
}

向 socket 发送 metric 的主流程如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int
main ( int argc, char *argv[] )
{
/* Loop */
for(;!done;)
{
// 每隔固定的时间采集一次 metric
/* Make sure we never wait for negative seconds (shouldn't happen) */
apr_interval_time_t wait = next_collection >= now ? next_collection - now : 1;

// udp_listen_channel 下面细说
if (udp_listen_channels != NULL)
{
/* Pull in incoming data */
poll_udp_listen_channels(wait, now);
}
else
{
/* Sleep until next collection */
apr_sleep( wait );
}

/* only continue if it's time to process our collection groups */
now = apr_time_now();
if(now < next_collection)
continue;

// 收集指标数据并发送
if(!mute)
{
/* collect data from collection_groups */
next_collection = process_collection_groups( now );
}
}
}

apr_time_t
process_collection_groups( apr_time_t now )
{
int i;
apr_time_t next = 0;

/* Run through each collection group and collect any data that needs collecting... */
for(i=0; i< collection_groups->nelts; i++)
{
Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
if(group->next_collect <= now)
{
// 收集指标数据
Ganglia_collection_group_collect(group, now);
}
}

/* Run through each collection group and send any data that needs sending... */
for(i=0; i< collection_groups->nelts; i++)
{
Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
if( group->next_send <= now )
{
// 发送指标数据
Ganglia_collection_group_send(group, now);
}
}
...
}

由于 Ganglia_collection_group_send(group, now) 的实现涉及比较深的函数调用链,单独拿出来看下。已略去部分无关逻辑,主要看函数调用关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
void
Ganglia_collection_group_send( Ganglia_collection_group *group, apr_time_t now)
{
for(i=0; i< group->metric_array->nelts; i++)
{
// 构造要发送的 metric data
if (override_hostname != NULL)
{
errors = Ganglia_metadata_send_real(gmetric, udp_send_channels, cb->msg.Ganglia_value_msg_u.gstr.metric_id.host);
}
else
{
errors = Ganglia_metadata_send(gmetric, udp_send_channels);
}
...
}
}

int
Ganglia_metadata_send( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels )
{
return Ganglia_metadata_send_real( gmetric, send_channels, NULL );
}

int
Ganglia_metadata_send_real( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels, char *override_string )
{
...
// 构造数据
/* Send the encoded data along...*/
return Ganglia_udp_send_message( send_channels, gmetricmsg, len);
}

/* This function will send a datagram to every udp_send_channel specified */
int
Ganglia_udp_send_message(Ganglia_udp_send_channels channels, char *buf, int len )
{
apr_status_t status;
int i;
int num_errors = 0;
apr_size_t size;
apr_array_header_t *chnls=(apr_array_header_t*)channels;

if(!chnls || !buf || len<=0)
return 1;

for(i=0; i< chnls->nelts; i++)
{
apr_socket_t *socket = ((apr_socket_t **)(chnls->elts))[i];
size = len;
// 真正的向 socket 发送数据代码
status = apr_socket_send( socket, buf, &size );
if(status != APR_SUCCESS)
{
num_errors++;
}
}
return num_errors;
}

4. udp recv

第三节的数据发送出去还需要接收,接收的 server 需要在 gmond.conf 中设置 udp_recv_channel,一般配置如下。其中主要的参数是 port 表示接收端口,其他的参数可以参考书籍《Monitoring with ganglia》。

1
2
3
4
5
6
7
8
/* You can specify as many udp_recv_channels as you like as well. */
udp_recv_channel {
port = 8649
retry_bind = true
# Size of the UDP buffer. If you are handling lots of metrics you really
# should bump it up to e.g. 10MB or even higher.
# buffer = 10485760
}

再说主流程代码之前,我们需要先了解一下 I/O 多路复用:server 对于多个 client 请求,也就是 socket 连接(网络 I/O 事件) 进行统一的管理。具体的实现有 select,poll,epoll,这里不再展开。APR(Apache portable Run-time libraries,Apache可移植运行库)主要为上层的应用程序提供一个可以跨越多操作系统平台使用的底层支持接口库。通过 APR 可以非常简单实现 poll 模型,官方示例如下。更多的可以参考 apr-tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int main(int argc, const char *argv[])
{
apr_status_t rv;
apr_pool_t *mp;
apr_socket_t *lsock;/* listening socket */
apr_pollset_t *pollset;
apr_int32_t num;
const apr_pollfd_t *ret_pfd;

apr_initialize();
apr_pool_create(&mp, NULL);

lsock = create_listen_sock(mp);
assert(lsock);

// 创建 pollset
apr_pollset_create(&pollset, DEF_POLLSET_NUM, mp, 0);
{
apr_pollfd_t pfd = { mp, APR_POLL_SOCKET, APR_POLLIN, 0, { NULL }, NULL };
pfd.desc.s = lsock;
// 将要监听的 socket 添加到 pollset
apr_pollset_add(pollset, &pfd);
}

// poll 监听与事件处理
while (1) {
// 获取准备就绪的 socket
rv = apr_pollset_poll(pollset, DEF_POLL_TIMEOUT, &num, &ret_pfd);
if (rv == APR_SUCCESS) {
int i;
assert(num > 0);
// 依次处理
/* scan the active sockets */
for (i = 0; i < num; i++) {
if (ret_pfd[i].desc.s == lsock) {
/* the listen socket is readable. that indicates we accepted a new connection */
do_accept(pollset, lsock, mp);
} else {
serv_ctx_t *serv_ctx = ret_pfd[i].client_data;
socket_callback_t cb_func = serv_ctx->cb_func;

cb_func(serv_ctx, pollset, ret_pfd[i].desc.s);
}
}
}
}

return 0;
}

有了上面的铺垫,我们只需要将 gmond 的代码对号入座即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
int
main ( int argc, char *argv[] )
{
...
if(!deaf)
{
setup_listen_channels_pollset();
}
...
}

// 创建
static void
setup_listen_channels_pollset( void )
{
int num_udp_recv_channels = cfg_size( config_file, "udp_recv_channel");
int num_tcp_accept_channels = cfg_size( config_file, "tcp_accept_channel"); // 这个是第 5 节的
int total_listen_channels = num_udp_recv_channels + num_tcp_accept_channels;

if (num_udp_recv_channels > 0) {
// 创建 pollset
if((status = apr_pollset_create(&udp_listen_channels, num_udp_recv_channels, global_context, pollset_opts)) != APR_SUCCESS)
{
...
exit(EXIT_FAILURE);
}
}

// 将 gmond.conf 配置中的所有 udp_recv_channel 都建立 socket 并进行监听
for(i = 0; i< num_udp_recv_channels; i++)
{
// 参数解析
udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
mcast_join = cfg_getstr( udp_recv_channel, "mcast_join" );
mcast_if = cfg_getstr( udp_recv_channel, "mcast_if" );
port = cfg_getint( udp_recv_channel, "port");
bindaddr = cfg_getstr( udp_recv_channel, "bind");
family = cfg_getstr( udp_recv_channel, "family");
retry_bind = cfg_getbool( udp_recv_channel, "retry_bind");
buffer = cfg_getint( udp_recv_channel, "buffer");

...
if( mcast_join )
{
/* Listen on the specified multicast channel */
socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
...
}
else
{
/* Create a UDP server */
socket = create_udp_server( pool, sock_family, port, bindaddr );
...
}

...
/* Build the socket poll file descriptor structure */
socket_pollfd.desc_type = APR_POLL_SOCKET;
socket_pollfd.reqevents = APR_POLLIN;
socket_pollfd.desc.s = socket;
...
/* Add the socket to the pollset */
status = apr_pollset_add(udp_listen_channels, &socket_pollfd);
...
}
}

监听处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
int
main ( int argc, char *argv[] )
{
for(;!done;)
{
if (udp_listen_channels != NULL)
{
/* Pull in incoming data */
poll_udp_listen_channels(wait, now);
}
...
}
}

static void
poll_udp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
{
...
status = apr_pollset_poll(udp_listen_channels, timeout, &num, &descs);
...
for(i = 0; i< num ; i++)
{
Ganglia_channel *channel = descs[i].client_data;
switch( channel->type )
{
case UDP_RECV_CHANNEL:
process_udp_recv_channel(descs+i, now);
udp_last_heard = apr_time_now();
break;
default:
continue;
}
}
}

static void
process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
{
...
// 收集数据
/* Grab the data */
status = apr_socket_recvfrom(remotesa, socket, 0, buf, &len);

// 数据处理, XDR 协议
/* Create the XDR receive stream */
xdrmem_create(&x, buf, max_udp_message_len, XDR_DECODE);
...
}

5. tcp channel

gmond.conf 还有一个配置项: tcp_accept_channel,根据该配置会在相应的端口建立一个 tcp server,以 xml 形式输出 gmond 收集到的指标数据,比如 gmetad 收集 gmond 数据就是通过这个 tcp server 获取的。

1
2
3
4
5
tcp_accept_channel {
port = 8649
# If you want to gzip XML output
gzip_output = no
}

处理和第四节的 udp_recv_channel 类似:建立 pollset,然后监听处理。pollset 建立也在函数 setup_listen_channels_pollset( void ) 中,不再细说。下面主要说一下处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int
main ( int argc, char *argv[] )
{
/* Create TCP listener thread */
if(!deaf)
{
apr_thread_t *thread;
if (apr_thread_create(&thread, NULL, tcp_listener, NULL, global_context) != APR_SUCCESS)
{
err_msg("Failed to create TCP listener thread. Exiting.\n");
exit(EXIT_FAILURE);
}
}
...
}

首先在 main 函数中通过函数 apr_thread_create 新建一个线程执行函数 tcp_listener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static void* APR_THREAD_FUNC tcp_listener(apr_thread_t *thd, void *data)
{
apr_time_t now;
apr_interval_time_t wait = 100 * 1000; // 100ms

if (tcp_listen_channels == NULL)
return NULL;

debug_msg("[tcp] Starting TCP listener thread...");
for(;!done;)
{
now = apr_time_now();
/* Pull in incoming data */
poll_tcp_listen_channels(wait, now);
}
apr_thread_exit(thd, APR_SUCCESS);

return NULL;
}

函数 tcp_listener 中的 poll_tcp_listen_channels 功能和函数 poll_udp_listen_channels 类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static void
poll_tcp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
{
/* Poll for incoming TCP requests */
status = apr_pollset_poll(tcp_listen_channels, timeout, &num, &descs);

for(i = 0; i< num ; i++)
{
Ganglia_channel *channel = descs[i].client_data;
switch( channel->type )
{
case TCP_ACCEPT_CHANNEL:
debug_msg("[tcp] Request for XML data received.");
process_tcp_accept_channel(descs+i, now);
debug_msg("[tcp] Request for XML data completed.");
break;
default:
continue;
}
}
}

static void
process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
{
apr_socket_t *client, *server;
...

status = apr_socket_accept(&client, server, client_context);
...

/* Print the DTD, GANGLIA_XML and CLUSTER tags */
status = print_xml_header(client);

// 逐个处理 host
for(hi = apr_hash_first(client_context, hosts); hi; hi = apr_hash_next(hi))
{
apr_hash_this(hi, NULL, NULL, &val);
status = print_host_start(client, (Ganglia_host *)val);

// 逐个处理 metric
apr_thread_mutex_lock(((Ganglia_host *)val)->mutex);
for(metric_hi = apr_hash_first(client_context, ((Ganglia_host *)val)->metrics);
metric_hi; metric_hi = apr_hash_next(metric_hi))
{
void *metric, *mval;
apr_hash_this(metric_hi, NULL, NULL, &metric);

mval = apr_hash_get(((Ganglia_host *)val)->gmetrics, ((Ganglia_metadata*)metric)->name, APR_HASH_KEY_STRING);

// 向 client 发送 metric
/* Print each of the metrics for a host ... */
if(print_host_metric(client, metric, mval, now) != APR_SUCCESS)
{
/* Release the mutex and close down the accepted socket */
...
return;
}
}
apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex);
}
...
}

上面涉及到以 print 开头的函数就是想 socket 发送数据,可以简单看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static apr_status_t
print_xml_header( apr_socket_t *client )
{
if(cluster_tag)
{
len = apr_snprintf( clusterxml, 1024,
"<CLUSTER NAME=\"%s\" LOCALTIME=\"%d\" OWNER=\"%s\" LATLONG=\"%s\" URL=\"%s\">\n",
name?name:"unspecified",
(int)(now / APR_USEC_PER_SEC),
owner?owner:"unspecified",
latlong?latlong:"unspecified",
url?url:"unspecified");

return socket_send( client, clusterxml, &len);
}
}

6. 总结

截止到上面,gmond 的主要模块我们基本上已经介绍完了,另外还有一些细节可以根据上面提供了脉络进行梳理。

7. 参考

  1. apr-tutorial
  2. ganglia source code