Ganglia 源码剖析之 gmetad

0. 引言

gmetad 负责汇总 gmond 节点的指标数据,相比 gmond 来说要简单很多。

1. 主流程

gmetad 的主要工作如下:

  • 收集 gmond 数据
  • 指标数据落盘
  • 提供指标数据查询(xml_port 和 interactive port)
1
2
3
4
5
6
7
8
9
10
int
main ( int argc, char *argv[] )
{

...
// 命令行参数解析
if (cmdline_parser(argc, argv, &args_info) != 0)
err_quit("command-line parser error");


}

2. gmond 数据收集

在 gmetad 的 conf 会定义 datasource,类似如下。

1
2
3
data_source "my cluster" 10 localhost my.machine.edu:8649 1.2.3.5:8655 
data_source "my grid" 50 1.3.4.7:8655 grid.org:8651 grid-backup.org:8651
data_source "another source" 1.3.4.8:8655 1.3.4.8

data_source 会指定 gmetad 采集的 gmond 的主机信息:server_ip 或者 server_hostname 和 port。如果 port 不指定则默认为 8649。data_source 后面可以指定多个 server,gmetad 采集的时候顺序采集 server list,指定多个 server list 的好处在于可以避免单点故障。收集 data source 的 metric 的主要代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
int
main ( int argc, char *argv[] )
{
num_sources = number_of_datasources( args_info.conf_arg );
...
/* Get the real number of data sources later */
sources = hash_create( num_sources + 10 );

for(;;)
{
// 随机时间间隔采集
/* Do at a random interval, between
(shortest_step/2) +/- METADATA_SLEEP_RANDOMIZE percent */
random_sleep_factor = (1 + (METADATA_SLEEP_RANDOMIZE / 50.0) * ((rand_r(&rand_seed) - RAND_MAX/2)/(float)RAND_MAX));
sleep_time = random_sleep_factor * apr_time_from_sec(c->shortest_step) / 2;
/* Make sure the sleep time is at least 1 second */
if(apr_time_sec(apr_time_now() + sleep_time) < (METADATA_MINIMUM_SLEEP + apr_time_sec(apr_time_now())))
sleep_time += apr_time_from_sec(METADATA_MINIMUM_SLEEP);
apr_sleep(sleep_time);

// 收集数据
/* Sum the new values */
hash_foreach(root.authority, do_root_summary, NULL );
}
}

// 类似 C++ 的 STL 中的 for_each
int
hash_foreach (hash_t * hash, int (*func)(datum_t *, datum_t *, void *), void *arg)
{
for (i = 0; i < hash->size && !stop; i++)
{
apr_thread_rwlock_rdlock(hash->lock[i]);
for (bucket = &hash->node[i]; bucket != NULL && bucket->key != NULL; bucket = bucket->next)
{
if (bucket->key == NULL) continue;
stop = func(bucket->key, bucket->val, arg);
if (stop) break;
}
apr_thread_rwlock_unlock(hash->lock[i]);
}
return stop;
}

/* Sums the metric summaries from all data sources. */
static int
do_root_summary( datum_t *key, datum_t *val, void *arg ) {}

3. tcp server: xml_port

gmetad 会根据配置文件新建两种 tcp server,一种是根据配置 xml_port(默认为 8651),一种是根据配置 interactive_port(默认为 8652)。根据 xml_port 创建的 tcp server 会以 xml 格式返回 gmetad 收集的所有集群数据,多层 gmetad 结构的话可以使用这些数据。根据 interactive_port 创建的 tcp server 一般是用来和 gweb 交互的,并且可以指定一些信息。先看 xml_port 的处理逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int
main ( int argc, char *argv[] )
{

parse_config_file ( args_info.conf_arg );

server_socket = g_tcp_socket_server_new( c->xml_port );
...

interactive_socket = g_tcp_socket_server_new( c->interactive_port );
...

/* Spin off the non-interactive server threads. (Half as many as interactive). */
for (i=0; i < c->server_threads/2; i++)
pthread_create(&pid, &attr, server_thread, (void*) 0);

/* Spin off the interactive server threads. */
for (i=0; i < c->server_threads; i++)
pthread_create(&pid, &attr, server_thread, (void*) 1);
}

从上面代码中可以看出 gmetad 会使用 pthread_create 创建进程负责 tcp server 的请求处理。server_threads 是 配置文件中 server_threads 指定的,默认为4。xml_port server 会使用 server_threads/2 个进程,interactive_port server 会使用 server_threads 个进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void *
server_thread (void *arg)
{

int interactive = (arg != NULL);

for (;;)
{
if (interactive)
{
pthread_mutex_lock(&server_interactive_mutex);
SYS_CALL( client.fd, accept(interactive_socket->sockfd, (struct sockaddr *) &(client.addr), &len));
pthread_mutex_unlock(&server_interactive_mutex);
ganglia_scoreboard_inc(TCP_REQS_ALL);
ganglia_scoreboard_inc(TCP_REQS_INTXML);
now = apr_time_now();
}
else
{
pthread_mutex_lock ( &server_socket_mutex );
SYS_CALL( client.fd, accept(server_socket->sockfd, (struct sockaddr *) &(client.addr), &len));
pthread_mutex_unlock( &server_socket_mutex );
ganglia_scoreboard_inc(TCP_REQS_ALL);
ganglia_scoreboard_inc(TCP_REQS_XML);
now = apr_time_now();
}

if (interactive)
{
request_len = readline(client.fd, request, REQUESTLEN);
// interactive 模式要处理 request
}
else
{
ganglia_scoreboard_inc(TCP_REQS_ALL);
strcpy(request, "/");
}

// 输出 xml 数据
if(root_report_start(&client))
{
...
}
// 输出集群的状态信息
if (process_path(&client, request, &rootdatum, NULL))
{
...
}
// 输出 xml 数据
if(root_report_end(&client))
{
...
}
}
}

4. tcp server: interactive port

类似 xml_port 的 tcp server 处理,不再赘述。

5. 指标数据落盘

我们知道 gmetad 会将机器收集的指标数据存放在 rrd 文件中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int
main ( int argc, char *argv[] )
{

for(;;)
{
...
/* Sum the new values */
hash_foreach(root.authority, do_root_summary, NULL );

/* summary completed */
pthread_mutex_unlock(root.sum_finished);

// 落盘
/* Save them to RRD */
hash_foreach(root.metric_summary, write_root_summary, NULL);
}
}

6. 总结

gmetad 相比 gmond 要简单很多,从源码结构中也可以看出来。