-
Notifications
You must be signed in to change notification settings - Fork 6
/
prestodb-graphite-emitter
155 lines (138 loc) · 7.34 KB
/
prestodb-graphite-emitter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
import configparser
import requests
import graphitesend
from urlparse import urlparse
import sys
import json
from time import sleep
import re
#Presto provides REST API for accessing JMX properties. PrestodbEmitter accesses this REST API to extract JMX property values.
#PrestodbEmitter class also emits those metrics on grpahite server using standard graphite protocol.
#This collects metrics including os metrics, node metrics, garbage collector metrics, qeury metrics, task manager/executor metrics,
#cluster manager metrics etc. To send prestodb metrics application is using graphitesend as plugin and it should be installed before running app.
#requirement can be installed using 'sudo pip install graphitesend'
#Usage : ./prestodb-graphite-emitter.py <Prestodb host> <Prestodb port> <Graphite host> <graphite port(carbon-cache line receiver port)> <time interval in seconds>
class PrestodbEmitter:
# PrestodbEmitter constructor which accepts presto and
def __init__(self, config_path):
self.config = configparser.ConfigParser()
self.read_properties(config_path)
# Reading config file and parsing provided configurations
def read_properties(self, path):
self.config.read(path)
self.presto_coord_url = "http://" + self.config['PRESTO_COORDINATOR']['IP'] + ":" + self.config['PRESTO_COORDINATOR']['PORT']
self.node_path = self.config['PRESTO_COORDINATOR']['node_path']
# Test function
def print_some_data(self):
print(self.config.sections())
print(self.config['MBEAN_ALIAS']['g1_gc_young'])
print(self.config['PRESTO_COORDINATOR']['mbean_path'])
# PrestodbEmitter entry point function which gets executed at interval of time. Interval is is specified as 5th command line argument
def run_emitter(self):
self.cluster_node = self.presto_coord_url
self.push_node_metrics("node_metrics")
self.push_metrics("cluster_memory_manager", "memory_manager_metrics")
self.push_metrics("query_manager", "query_manager_metrics")
self.push_metrics("query_execution", "query_execution_metrics")
cluster_nodes = self.get_all_cluster_nodes()
for node in cluster_nodes:
self.cluster_node = node
self.push_metrics("os", "os_metrics")
self.push_metrics("task_executor", "task_executor_metrics")
self.push_metrics("task_manager", "task_manager_metrics")
self.push_metrics("memory", "memory_usage_metrics")
self.push_metrics("g1_gc_young", "gc_g1_metrics.g1_young_generation")
self.push_metrics("g1_gc_old", "gc_g1_metrics.g1_old_generation")
# Get all cluster nodes
def get_all_cluster_nodes(self):
cluster_node_list = [self.presto_coord_url]
json_metrics = self.get_node_json_metrics(self.presto_coord_url + self.node_path)
for node_metrics in json_metrics:
cluster_node_list.append(node_metrics["uri"])
return cluster_node_list
# This method collects cluster node metrics and sends it to graphite server
def push_node_metrics(self, prefix):
graphite_cli = self.get_graphite_sender("presto")
json_metrics = self.get_node_json_metrics(self.presto_coord_url + self.node_path)
if json_metrics is not None:
for node_metrics in json_metrics:
for node_metrics_attribute in json.loads(self.config['NODE_METRICS']['NODE_METRICS']):
node_url = urlparse(node_metrics["uri"])
graphite_cli.send(prefix + "." + node_url.netloc.replace(".", "_") + "." + node_metrics_attribute,node_metrics[node_metrics_attribute])
return
# Generic method takes mbean alias and graphite tag prefix and collects metrics for respective mbean
def push_metrics(self, mbean_alias, prefix):
graphite_cli = self.get_graphite_sender("presto")
json_metrics = self.get_mbean_json_metrics(mbean_alias)
if json_metrics is not None:
filtered_metrics = dict()
for attribute in json_metrics["attributes"]:
if 'name' in attribute and 'value' in attribute:
filtered_metrics[attribute["name"]] = self.extract_value(attribute["value"])
self.push_filtered_metrics(filtered_metrics, graphite_cli, prefix)
return
# This method extracts all required attrbute from JMX mbean REST API Response
def extract_value(self, json_metrics):
if isinstance(json_metrics, dict):
if 'value' in json_metrics and 'key' in json_val:
dict_metrics = {}
dict_metrics['key'] = self.extract_value(json_metrics['value'])
return dict_metrics
else:
obj_metrics = {}
for k, v in json_metrics.items():
obj_metrics[k] = self.extract_value(v)
return obj_metrics
elif isinstance(json_metrics, list):
list_metrics = {}
for json_obj in json_metrics:
if isinstance(json_obj, dict):
list_metrics[json_obj['key']] = self.extract_value(json_obj['value'])
return list_metrics
else:
return json_metrics
return
# This method sends all extracted attributes to graphite server
def push_filtered_metrics(self, json_metrics, graphite_cli, prefix):
for k, v in json_metrics.items():
key = prefix + "." + k
if isinstance(v, dict):
self.push_filtered_metrics(v, graphite_cli, key)
elif isinstance(v, int):
print(key)
graphite_cli.send(key, v)
elif isinstance(v, float):
print(key)
graphite_cli.send(key, v)
else:
continue
return
# Returns grphite client sender object
def get_graphite_sender(self, prefix):
return graphitesend.init(system_name=re.search('//(.+?):',self.cluster_node.replace(".", "_")).group(1), graphite_server=self.config['GRAPHITE']['IP'],
graphite_port=int(self.config['GRAPHITE']['PORT']), prefix=prefix, fqdn_squash=True,
lowercase_metric_names=True, asynchronous=False, timeout_in_seconds=5)
# This method makes REST call to particular mbean type and returns json object
def get_mbean_json_metrics(self, mbean_alias):
response = requests.get(self.cluster_node + self.config['PRESTO_COORDINATOR']['mbean_path']
+ self.config['MBEAN_ALIAS'][mbean_alias])
return response.json()
# This method makes REST call to Node API URL and returns json object
def get_node_json_metrics(self, url):
response = requests.get(url)
return response.json()
# Main method which takes 2 parameters as commandline arguments (config file, iteration interval) and calls run_emitter of PrestodbEmitter class with specified interval of time
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Usage: ./prestodb-graphite-emitter.py [Config file path] [Repeat Interval(seconds)]")
else:
emitter = PrestodbEmitter(sys.argv[1])
print("Emitter started successfully!")
while True:
emitter.run_emitter()
try:
sleep(int(sys.argv[2]))
except (KeyboardInterrupt, SystemExit):
break
continue