netbox-zabbix-sync/modules/tools.py
2025-06-14 20:15:05 +00:00

138 lines
5.0 KiB
Python

"""A collection of tools used by several classes"""
def convert_recordset(recordset):
"""Converts netbox RedcordSet to list of dicts."""
recordlist = []
for record in recordset:
recordlist.append(record.__dict__)
return recordlist
def build_path(endpoint, list_of_dicts):
"""
Builds a path list of related parent/child items.
This can be used to generate a joinable list to
be used in hostgroups.
"""
item_path = []
itemlist = [i for i in list_of_dicts if i["name"] == endpoint]
item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item["name"])
while item["_depth"] > 0:
itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])]
item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item["name"])
item_path.reverse()
return item_path
def proxy_prepper(proxy_list, proxy_group_list):
"""
Function that takes 2 lists and converts them using a
standardized format for further processing.
"""
output = []
for proxy in proxy_list:
proxy["type"] = "proxy"
proxy["id"] = proxy["proxyid"]
proxy["idtype"] = "proxyid"
proxy["monitored_by"] = 1
output.append(proxy)
for group in proxy_group_list:
group["type"] = "proxy_group"
group["id"] = group["proxy_groupid"]
group["idtype"] = "proxy_groupid"
group["monitored_by"] = 2
output.append(group)
return output
def field_mapper(host, mapper, nbdevice, logger):
"""
Maps NetBox field data to Zabbix properties.
Used for Inventory, Usermacros and Tag mappings.
"""
data = {}
# Let's build an dict for each property in the map
for nb_field, zbx_field in mapper.items():
field_list = nb_field.split("/") # convert str to list based on delimiter
# start at the base of the dict...
value = nbdevice
# ... and step through the dict till we find the needed value
for item in field_list:
value = value[item] if value else None
# Check if the result is usable and expected
# We want to apply any int or float 0 values,
# even if python thinks those are empty.
if (value and isinstance(value, int | float | str)) or (
isinstance(value, int | float) and int(value) == 0
):
data[zbx_field] = str(value)
elif not value:
# empty value should just be an empty string for API compatibility
logger.debug(
f"Host {host}: NetBox lookup for "
f"'{nb_field}' returned an empty value"
)
data[zbx_field] = ""
else:
# Value is not a string or numeral, probably not what the user expected.
logger.error(
f"Host {host}: Lookup for '{nb_field}'"
" returned an unexpected type: it will be skipped."
)
logger.debug(
f"Host {host}: Field mapping complete. "
f"Mapped {len(list(filter(None, data.values())))} field(s)"
)
return data
def remove_duplicates(input_list, sortkey=None):
"""
Removes duplicate entries from a list and sorts the list
"""
output_list = []
if isinstance(input_list, list):
output_list = [dict(t) for t in {tuple(d.items()) for d in input_list}]
if sortkey and isinstance(sortkey, str):
output_list.sort(key=lambda x: x[sortkey])
return output_list
def sanatize_log_output(data):
"""
Used for the update function to Zabbix which
shows the data that its using to update the host.
Removes and sensitive data from the input.
"""
if not isinstance(data, dict):
return data
sanitized_data = data.copy()
# Check if there are any sensitive macros defined in the data
if "macros" in data:
for macro in sanitized_data["macros"]:
# Check if macro is secret type
if not macro["type"] == str(1):
continue
macro["value"] = "********"
# Check for interface data
if "interfaceid" in data:
# Interface tID is a value which is most likely not helpful
# in logging output or for roubleshooting.
del sanitized_data["interfaceid"]
# InterfaceID also hints that this is a interface update.
# A check is required if there are no macro's used for SNMP security parameters.
if not "details" in data:
return sanitized_data
for key, detail in sanitized_data["details"].items():
# If the detail is a secret, we don't want to log it.
if key in ("authpassphrase", "privpassphrase", "securityname", "community"):
# Check if a macro is used.
# If so then logging the output is not a security issue.
if detail.startswith("{$") and detail.endswith("}"):
continue
# A macro is not used, so we sanitize the value.
sanitized_data["details"][key] = "********"
return sanitized_data