// 查找配置文件 /* Find the specified module configuration in gmond.conf If this return NULL then either the module config doesn't exist or the module is disabled. */ module_cfg = find_module_config(modname); if (!module_cfg) continue;
PyEval_RestoreThread(gtstate); // 载入 python module pmod = PyImport_ImportModule(modname); ... // 获得 module 中的 metric_init() 函数地址 pinitfunc = PyObject_GetAttrString(pmod, "metric_init"); ... // 构造函数参数 /* Build a parameter dictionary to pass to the module */ pparamdict = build_params_dict(module_cfg); // 调用 metric_init(param) 函数,这个函数会返回一个 dict,包含了 metric name 和 handler 的对应关系 /* Now call the metric_init method of the python module */ pobj = PyObject_CallFunction(pinitfunc, "(N)", pparamdict); ... // 处理 metric_init(param) 函数返回的 dict }
udp_send_channel { #bind_hostname = yes # Highly recommended, soon to be default. # This option tells gmond to use a source address # that resolves to the machine's hostname. Without # this, the metrics may appear to come from any # interface and the DNS names associated with # those IPs will be used to create the RRDs. #mcast_join = 10.252.135.105 host = emr-header-1 port = 8649 }
int main ( int argc, char *argv[] ) { /* Loop */ for(;!done;) { // 每隔固定的时间采集一次 metric /* Make sure we never wait for negative seconds (shouldn't happen) */ apr_interval_time_t wait = next_collection >= now ? next_collection - now : 1; // udp_listen_channel 下面细说 if (udp_listen_channels != NULL) { /* Pull in incoming data */ poll_udp_listen_channels(wait, now); } else { /* Sleep until next collection */ apr_sleep( wait ); }
/* only continue if it's time to process our collection groups */ now = apr_time_now(); if(now < next_collection) continue; // 收集指标数据并发送 if(!mute) { /* collect data from collection_groups */ next_collection = process_collection_groups( now ); } } }
apr_time_t process_collection_groups( apr_time_t now ) { int i; apr_time_t next = 0;
/* Run through each collection group and collect any data that needs collecting... */ for(i=0; i< collection_groups->nelts; i++) { Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i]; if(group->next_collect <= now) { // 收集指标数据 Ganglia_collection_group_collect(group, now); } }
/* Run through each collection group and send any data that needs sending... */ for(i=0; i< collection_groups->nelts; i++) { Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i]; if( group->next_send <= now ) { // 发送指标数据 Ganglia_collection_group_send(group, now); } } ... }
int Ganglia_metadata_send_real( Ganglia_metric gmetric, Ganglia_udp_send_channels send_channels, char *override_string ) { ... // 构造数据 /* Send the encoded data along...*/ return Ganglia_udp_send_message( send_channels, gmetricmsg, len); }
/* This function will send a datagram to every udp_send_channel specified */ int Ganglia_udp_send_message(Ganglia_udp_send_channels channels, char *buf, int len ) { apr_status_t status; int i; int num_errors = 0; apr_size_tsize; apr_array_header_t *chnls=(apr_array_header_t*)channels;
第三节的数据发送出去还需要接收,接收的 server 需要在 gmond.conf 中设置 udp_recv_channel,一般配置如下。其中主要的参数是 port 表示接收端口,其他的参数可以参考书籍《Monitoring with ganglia》。
1 2 3 4 5 6 7 8
/* You can specify as many udp_recv_channels as you like as well. */ udp_recv_channel { port = 8649 retry_bind = true # Size of the UDP buffer. If you are handling lots of metrics you really # should bump it up to e.g. 10MB or even higher. # buffer = 10485760 }
// poll 监听与事件处理 while (1) { // 获取准备就绪的 socket rv = apr_pollset_poll(pollset, DEF_POLL_TIMEOUT, &num, &ret_pfd); if (rv == APR_SUCCESS) { int i; assert(num > 0); // 依次处理 /* scan the active sockets */ for (i = 0; i < num; i++) { if (ret_pfd[i].desc.s == lsock) { /* the listen socket is readable. that indicates we accepted a new connection */ do_accept(pollset, lsock, mp); } else { serv_ctx_t *serv_ctx = ret_pfd[i].client_data; socket_callback_t cb_func = serv_ctx->cb_func;
int main ( int argc, char *argv[] ) { for(;!done;) { if (udp_listen_channels != NULL) { /* Pull in incoming data */ poll_udp_listen_channels(wait, now); } ... } }
staticvoid poll_tcp_listen_channels( apr_interval_time_t timeout, apr_time_t now) { /* Poll for incoming TCP requests */ status = apr_pollset_poll(tcp_listen_channels, timeout, &num, &descs);
for(i = 0; i< num ; i++) { Ganglia_channel *channel = descs[i].client_data; switch( channel->type ) { case TCP_ACCEPT_CHANNEL: debug_msg("[tcp] Request for XML data received."); process_tcp_accept_channel(descs+i, now); debug_msg("[tcp] Request for XML data completed."); break; default: continue; } } }
mval = apr_hash_get(((Ganglia_host *)val)->gmetrics, ((Ganglia_metadata*)metric)->name, APR_HASH_KEY_STRING); // 向 client 发送 metric /* Print each of the metrics for a host ... */ if(print_host_metric(client, metric, mval, now) != APR_SUCCESS) { /* Release the mutex and close down the accepted socket */ ... return; } } apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex); } ... }