int main ( int argc, char *argv[] ) { num_sources = number_of_datasources( args_info.conf_arg ); ... /* Get the real number of data sources later */ sources = hash_create( num_sources + 10 ); for(;;) { // 随机时间间隔采集 /* Do at a random interval, between (shortest_step/2) +/- METADATA_SLEEP_RANDOMIZE percent */ random_sleep_factor = (1 + (METADATA_SLEEP_RANDOMIZE / 50.0) * ((rand_r(&rand_seed) - RAND_MAX/2)/(float)RAND_MAX)); sleep_time = random_sleep_factor * apr_time_from_sec(c->shortest_step) / 2; /* Make sure the sleep time is at least 1 second */ if(apr_time_sec(apr_time_now() + sleep_time) < (METADATA_MINIMUM_SLEEP + apr_time_sec(apr_time_now()))) sleep_time += apr_time_from_sec(METADATA_MINIMUM_SLEEP); apr_sleep(sleep_time); // 收集数据 /* Sum the new values */ hash_foreach(root.authority, do_root_summary, NULL ); } }
// 类似 C++ 的 STL 中的 for_each int hash_foreach (hash_t * hash, int (*func)(datum_t *, datum_t *, void *), void *arg) { for (i = 0; i < hash->size && !stop; i++) { apr_thread_rwlock_rdlock(hash->lock[i]); for (bucket = &hash->node[i]; bucket != NULL && bucket->key != NULL; bucket = bucket->next) { if (bucket->key == NULL) continue; stop = func(bucket->key, bucket->val, arg); if (stop) break; } apr_thread_rwlock_unlock(hash->lock[i]); } returnstop; }
/* Sums the metric summaries from all data sources. */ staticint do_root_summary( datum_t *key, datum_t *val, void *arg ) {}
3. tcp server: xml_port
gmetad 会根据配置文件新建两种 tcp server,一种是根据配置 xml_port(默认为 8651),一种是根据配置 interactive_port(默认为 8652)。根据 xml_port 创建的 tcp server 会以 xml 格式返回 gmetad 收集的所有集群数据,多层 gmetad 结构的话可以使用这些数据。根据 interactive_port 创建的 tcp server 一般是用来和 gweb 交互的,并且可以指定一些信息。先看 xml_port 的处理逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
int main ( int argc, char *argv[] ) { parse_config_file ( args_info.conf_arg ); server_socket = g_tcp_socket_server_new( c->xml_port ); ... interactive_socket = g_tcp_socket_server_new( c->interactive_port ); ... /* Spin off the non-interactive server threads. (Half as many as interactive). */ for (i=0; i < c->server_threads/2; i++) pthread_create(&pid, &attr, server_thread, (void*) 0);
/* Spin off the interactive server threads. */ for (i=0; i < c->server_threads; i++) pthread_create(&pid, &attr, server_thread, (void*) 1); }
从上面代码中可以看出 gmetad 会使用 pthread_create 创建进程负责 tcp server 的请求处理。server_threads 是 配置文件中 server_threads 指定的,默认为4。xml_port server 会使用 server_threads/2 个进程,interactive_port server 会使用 server_threads 个进程。