|
| 1 | +import collectd |
| 2 | +from datetime import datetime |
| 3 | +from memsql.common.random_aggregator_pool import RandomAggregatorPool |
| 4 | + |
| 5 | +def memsql_config(config): |
| 6 | + """ Handle setting up collectd-memsql. """ |
| 7 | + |
| 8 | + # This is a data structure passed to every memsql_write. We store |
| 9 | + # all the configuration and globals here |
| 10 | + data = dict( |
| 11 | + host=None, |
| 12 | + port=None, |
| 13 | + user='root', |
| 14 | + password='', |
| 15 | + database='dashboard', |
| 16 | + typesdb={}, |
| 17 | + previous_values={}, |
| 18 | + pool=RandomAggregatorPool() |
| 19 | + ) |
| 20 | + |
| 21 | + for child in config.children: |
| 22 | + collectd.debug("Config settings %s" % child.key) |
| 23 | + key = child.key.lower() |
| 24 | + if key == 'host': |
| 25 | + data['host'] = child.values[0] |
| 26 | + elif key == 'port': |
| 27 | + data['port'] = int(child.values[0]) |
| 28 | + elif key == 'user': |
| 29 | + data['user'] = child.values[0] |
| 30 | + elif key == 'password': |
| 31 | + data['password'] = child.values[0] |
| 32 | + elif key == 'database': |
| 33 | + data['database'] = child.values[0] |
| 34 | + elif key == 'typesdb': |
| 35 | + for v in child.values: |
| 36 | + memsql_parse_types_file(v, data) |
| 37 | + |
| 38 | + assert 'host' in data, 'MemSQL host is not defined' |
| 39 | + assert 'port' in data, 'MemSQL port is not defined' |
| 40 | + |
| 41 | + collectd.info('Initializing collectd-memsql with %s:%s' % (data['host'], data['port'])) |
| 42 | + collectd.register_write(memsql_write, data) |
| 43 | + |
| 44 | +collectd.register_config(memsql_config) |
| 45 | + |
| 46 | +######################### |
| 47 | +## Write/utility functions |
| 48 | + |
| 49 | +def memsql_parse_types_file(path, data): |
| 50 | + """ This function tries to parse a collectd compliant types.db file. |
| 51 | + Basically stolen from collectd-carbon. |
| 52 | + """ |
| 53 | + collectd.debug('memsql_parse_types_file') |
| 54 | + types = data['typesdb'] |
| 55 | + |
| 56 | + f = open(path, 'r') |
| 57 | + |
| 58 | + for line in f: |
| 59 | + fields = line.split() |
| 60 | + if len(fields) < 2: |
| 61 | + continue |
| 62 | + |
| 63 | + type_name = fields[0] |
| 64 | + |
| 65 | + if type_name[0] == '#': |
| 66 | + continue |
| 67 | + |
| 68 | + v = [] |
| 69 | + for ds in fields[1:]: |
| 70 | + ds = ds.rstrip(',') |
| 71 | + ds_fields = ds.split(':') |
| 72 | + |
| 73 | + if len(ds_fields) != 4: |
| 74 | + collectd.info('memsql_writer: cannot parse data source %s on type %s' % ( ds, type_name )) |
| 75 | + continue |
| 76 | + |
| 77 | + v.append(ds_fields) |
| 78 | + |
| 79 | + types[type_name] = v |
| 80 | + |
| 81 | + f.close() |
| 82 | + |
| 83 | +def memsql_connect(data): |
| 84 | + """ Returns a live MemSQL connection. """ |
| 85 | + return data['pool'].connect(data['host'], data['port'], data['user'], data['password'], data['database']) |
| 86 | + |
| 87 | +def memsql_write(collectd_sample, data=None): |
| 88 | + """ Write handler for collectd. |
| 89 | + This function is called per sample taken from every plugin. It is |
| 90 | + parallelized among multiple threads by collectd. |
| 91 | + """ |
| 92 | + types = data['typesdb'] |
| 93 | + if collectd_sample.type not in types: |
| 94 | + collectd.info('memsql_writer: do not know how to handle type %s. do you have all your types.db files configured?' % collectd_sample.type) |
| 95 | + return |
| 96 | + |
| 97 | + value_types = types[collectd_sample.type] |
| 98 | + |
| 99 | + if len(value_types) != len(collectd_sample.values): |
| 100 | + collectd.info('memsql_writer: differing number of values for type %s' % collectd_sample.type) |
| 101 | + return |
| 102 | + |
| 103 | + for (value_type, value) in zip(value_types, collectd_sample.values): |
| 104 | + persist_value(value, value_type[0], value_type[1], collectd_sample, data) |
| 105 | + |
| 106 | +def persist_value(new_value, data_source_name, data_source_type, collectd_sample, data): |
| 107 | + """ Persist a new value to the database. |
| 108 | + Handles converting COUNTER and DERIVE types. |
| 109 | + """ |
| 110 | + rate = None |
| 111 | + |
| 112 | + # special case counter and derive since we only care about the |
| 113 | + # "difference" between their values (rather than their actual value) |
| 114 | + if data_source_type == 'COUNTER' or data_source_type == 'DERIVE': |
| 115 | + key = (data_source_name, |
| 116 | + data_source_type, |
| 117 | + collectd_sample.plugin, |
| 118 | + collectd_sample.plugin_instance, |
| 119 | + collectd_sample.type, |
| 120 | + collectd_sample.type_instance) |
| 121 | + |
| 122 | + # get the previous values if they exist |
| 123 | + old_value, old_time = data['previous_values'].get(key, [None, None]) |
| 124 | + |
| 125 | + # save the new value in previous values |
| 126 | + data['previous_values'][key] = (new_value, collectd_sample.time) |
| 127 | + |
| 128 | + if old_value is None: |
| 129 | + # this was the first value, nothing to do |
| 130 | + return |
| 131 | + |
| 132 | + # don't let the time delta fall below 1 |
| 133 | + time_delta = float(max(1, collectd_sample.time - old_time)) |
| 134 | + |
| 135 | + # The following formula handles wrapping COUNTER data types |
| 136 | + # around since COUNTER's should never be negative. |
| 137 | + # Taken from: https://collectd.org/wiki/index.php/Data_source |
| 138 | + if data_source_type == 'COUNTER' and new_value < old_value: |
| 139 | + if old_value < 2 ** 32: |
| 140 | + rate = (2 ** 32 - old_value + new_value) / time_delta |
| 141 | + else: |
| 142 | + rate = (2 ** 64 - old_value + new_value) / time_delta |
| 143 | + else: |
| 144 | + rate = (new_value - old_value) / time_delta |
| 145 | + |
| 146 | + elif data_source_type == 'GAUGE' or data_source_name == 'ABSOLUTE': |
| 147 | + rate = new_value |
| 148 | + |
| 149 | + else: |
| 150 | + collectd.debug("MemSQL collectd. Undefined data source %s" % data_source_type) |
| 151 | + return |
| 152 | + |
| 153 | + query_params = [ |
| 154 | + datetime.fromtimestamp(int(collectd_sample.time)).strftime('%Y-%m-%d %H:%M:%S'), |
| 155 | + collectd_sample.host, |
| 156 | + collectd_sample.plugin, |
| 157 | + collectd_sample.type, |
| 158 | + collectd_sample.plugin_instance, |
| 159 | + collectd_sample.type_instance, |
| 160 | + data_source_name, |
| 161 | + rate |
| 162 | + ] |
| 163 | + |
| 164 | + with memsql_connect(data) as conn: |
| 165 | + conn.execute(""" |
| 166 | + INSERT INTO analytics |
| 167 | + (created, host, plugin, type, instance, category, value_name, value) |
| 168 | + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) |
| 169 | + """, *query_params) |
| 170 | + |
0 commit comments