Telegraf: dynamically adding custom tags
Background
For a recent project, I wanted to add a custom tag to data coming in from a built-in input plugin for telegraf.
The input plugin was the procstat plugin, and the custom data was information from pacemaker (a clustering solution for linux). I wanted to add a tag indicating if the current host was the "active" host in my active/passive setup.
For this, the best solution I came up with was to use a recently released execd processor plugin for telegraf.
How it works
The execd processor plugin runs an external program as a separate process and pipes metrics in to the process's STDIN and reads processed metrics from its STDOUT.
data:image/s3,"s3://crabby-images/65653/656536a60d81dec612d7c1ac21396eb84b5b2e0f" alt="Telegraf plugins interaction diagram"
Telegraf's filtering parameters can be used to select or limit data from which input plugins will go to this processor.
The external program
The external program I wrote does the following:
- Get pacemaker status and cache it for 10 seconds
- Read a line from stdin
- Append the required information as a tag in the data
- Write it to stdout
The caching is just an optimization - it was more to do with decreasing polluting the logs than actual speed improvements.
Also, I've done the Influxdb lineprotocol parsing in my code directly (because my usecase is simple), but this can be substituted by an actual library meant for handling lineprotocol.
#!/usr/bin/python
from __future__ import print_function
from sys import stderr
import fileinput
import subprocess
import time
cache_value = None
cache_time = 0
resource_name = "VIP"
def get_crm_status():
global cache_value, cache_time, resource_name
ctime = time.time()
if ctime - cache_time > 10:
# print("Cache busted", file=stderr)
try:
crm_node = subprocess.check_output(["sudo", "/usr/sbin/crm_node", "-n"]).rstrip()
crm_resource = subprocess.check_output(["sudo", "/usr/sbin/crm_resource", "-r", resource_name, "-W"]).rstrip()
active_node = crm_resource.split(" ")[-1]
if active_node == crm_node:
cache_value = "active"
else:
cache_value = "inactive"
except (OSError, IOError) as e:
print("Exception: %s" % e, file=stderr)
# Don't report active/inactive if crm commands are not found
cache_value = None
except Exception as e:
print("Exception: %s" % e, file=stderr)
# Report as inactive in other cases by default
cache_value = "inactive"
cache_time = ctime
return cache_value
def lineprotocol_add_tag(line, key, value):
first_comma = line.find(",")
first_space = line.find(" ")
if first_comma >= 0 and first_comma <= first_space:
split_str = ","
else:
split_str = " "
parts = line.split(split_str)
first, rest = parts[0], parts[1:]
first_new = first + "," + key + "=" + value
return split_str.join([first_new] + rest)
for line in fileinput.input():
line = line.rstrip()
crm_status = get_crm_status()
if crm_status:
try:
new_line = lineprotocol_add_tag(line, "crm_status", crm_status)
except Exception as e:
print("Exception: %s, Input: %s" % (e, line), file=stderr)
new_line = line
else:
new_line = line
print(new_line)
Telegraf configuration
Here's a sample telegraf configuration that routes data from "system" plugin to execd processor plugin, and finally outputs to influxdb.
[agent]
interval = "30s"
[[inputs.cpu]]
[[inputs.system]]
[[processors.execd]]
command = ["/usr/bin/python", "/etc/telegraf/scripts/pacemaker_status.py"]
namepass = ["system"]
[[outputs.influxdb]]
urls = ["http://127.0.0.1:8086"]
database = "telegraf"
Other types of dynamic tags
In this example, we wanted to get the value of the tag from an external program. If the tag can be calculated from the incoming data itself, then things are much simpler. There are a lot of processor plugins, and many things can be achieved using just those.
Interactions