🔊 Removed f-strings usage from logs

This commit is contained in:
Wouter de Bruijn 2025-06-25 13:56:41 +02:00
parent e0ec3c0632
commit 57c7f83e6a
No known key found for this signature in database
GPG Key ID: AC71F96733B92BFA
6 changed files with 246 additions and 176 deletions

View File

@ -26,6 +26,7 @@ from modules.config import load_config
config = load_config()
class PhysicalDevice:
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments
"""
@ -97,7 +98,7 @@ class PhysicalDevice:
if config["device_cf"] in self.nb.custom_fields:
self.zabbix_id = self.nb.custom_fields[config["device_cf"]]
else:
e = f'Host {self.name}: Custom field {config["device_cf"]} not present'
e = f"Host {self.name}: Custom field {config['device_cf']} not present"
self.logger.error(e)
raise SyncInventoryError(e)
@ -111,9 +112,10 @@ class PhysicalDevice:
self.visible_name = self.nb.name
self.use_visible_name = True
self.logger.info(
f"Host {self.visible_name} contains special characters. "
f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix."
"Host %s contains special characters. Using %s as name for the NetBox object and using %s as visible name in Zabbix.",
self.visible_name,
self.name,
self.visible_name,
)
else:
pass
@ -126,8 +128,8 @@ class PhysicalDevice:
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config['traverse_site_groups'],
nested_region_flag=config['traverse_regions'],
nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
@ -139,12 +141,12 @@ class PhysicalDevice:
# Remove duplicates and None values
self.hostgroups = list(filter(None, list(set(self.hostgroups))))
if self.hostgroups:
self.logger.debug(f"Host {self.name}: Should be member "
f"of groups: {self.hostgroups}")
self.logger.debug(
"Host %s: Should be member of groups: %s", self.name, self.hostgroups
)
return True
return False
def set_template(self, prefer_config_context, overrule_custom):
"""Set Template"""
self.zbx_template_names = None
@ -210,9 +212,10 @@ class PhysicalDevice:
# Set inventory mode. Default is disabled (see class init function).
if config["inventory_mode"] == "disabled":
if config["inventory_sync"]:
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. "
"Inventory sync is enabled in "
"config but inventory mode is disabled.")
self.logger.error(
"Host %s: Unable to map NetBox inventory to Zabbix. Inventory sync is enabled in config but inventory mode is disabled",
self.name,
)
return True
if config["inventory_mode"] == "manual":
self.inventory_mode = 0
@ -220,17 +223,20 @@ class PhysicalDevice:
self.inventory_mode = 1
else:
self.logger.error(
f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {config['inventory_mode']}"
"Host %s: Specified value for inventory mode in config is not valid. Got value %s",
self.name,
config["inventory_mode"],
)
return False
self.inventory = {}
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper.")
self.logger.debug("Host %s: Starting inventory mapper.", self.name)
self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger
)
self.logger.debug(f"Host {self.name}: Resolved inventory: {self.inventory}")
self.logger.debug(
"Host %s: Resolved inventory: %s", self.name, self.inventory
)
return True
def isCluster(self):
@ -268,13 +274,14 @@ class PhysicalDevice:
masterid = self.getClusterMaster()
if masterid == self.id:
self.logger.info(
f"Host {self.name} is primary cluster member. "
f"Modifying hostname from {self.name} to "
+ f"{self.nb.virtual_chassis.name}."
"Host %s is primary cluster member. Modifying hostname from %s to %s.",
self.name,
self.name,
self.nb.virtual_chassis.name,
)
self.name = self.nb.virtual_chassis.name
return True
self.logger.info(f"Host {self.name} is non-primary cluster member.")
self.logger.info("Host %s is non-primary cluster member.", self.name)
return False
def zbxTemplatePrepper(self, templates):
@ -306,8 +313,10 @@ class PhysicalDevice:
"name": zbx_template["name"],
}
)
e = (f"Host {self.name}: Found template '{zbx_template['name']}' "
f"(ID:{zbx_template['templateid']})")
e = (
f"Host {self.name}: Found template '{zbx_template['name']}' "
f"(ID:{zbx_template['templateid']})"
)
self.logger.debug(e)
# Return error should the template not be found in Zabbix
if not template_match:
@ -331,7 +340,7 @@ class PhysicalDevice:
self.group_ids.append({"groupid": group["groupid"]})
e = (
f"Host {self.name}: Matched group "
f"\"{group['name']}\" (ID:{group['groupid']})"
f'"{group["name"]}" (ID:{group["groupid"]})'
)
self.logger.debug(e)
if len(self.group_ids) == len(self.hostgroups):
@ -412,7 +421,7 @@ class PhysicalDevice:
macros = ZabbixUsermacros(
self.nb,
self._usermacro_map(),
config['usermacro_sync'],
config["usermacro_sync"],
logger=self.logger,
host=self.name,
)
@ -430,14 +439,14 @@ class PhysicalDevice:
tags = ZabbixTags(
self.nb,
self._tag_map(),
tag_sync=config['tag_sync'],
tag_lower=config['tag_lower'],
tag_name=config['tag_name'],
tag_value=config['tag_value'],
tag_sync=config["tag_sync"],
tag_lower=config["tag_lower"],
tag_name=config["tag_name"],
tag_value=config["tag_value"],
logger=self.logger,
host=self.name,
)
if config['tag_sync'] is False:
if config["tag_sync"] is False:
self.tags = []
return False
self.tags = tags.generate()
@ -477,12 +486,12 @@ class PhysicalDevice:
# If the proxy name matches
if proxy["name"] == proxy_name:
self.logger.debug(
f"Host {self.name}: using {proxy['type']}" f" '{proxy_name}'"
"Host %s: using {proxy['type']} '%s'", self.name, proxy_name
)
self.zbxproxy = proxy
return True
self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}"
"Host %s: unable to find proxy %s", self.name, proxy_name
)
return False
@ -554,7 +563,7 @@ class PhysicalDevice:
self.create_journal_entry("success", msg)
else:
self.logger.error(
f"Host {self.name}: Unable to add to Zabbix. Host already present."
"Host %s: Unable to add to Zabbix. Host already present.", self.name
)
def createZabbixHostgroup(self, hostgroups):
@ -612,7 +621,9 @@ class PhysicalDevice:
)
self.logger.error(e)
raise SyncExternalError(e) from None
self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.")
self.logger.info(
"Host %s: updated with data %s.", self.name, sanatize_log_output(kwargs)
)
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck(
@ -623,7 +634,7 @@ class PhysicalDevice:
Checks if Zabbix object is still valid with NetBox parameters.
"""
# If group is found or if the hostgroup is nested
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if create_hostgroups:
# Script is allowed to create a new hostgroup
new_groups = self.createZabbixHostgroup(groups)
@ -672,28 +683,30 @@ class PhysicalDevice:
raise SyncInventoryError(e)
host = host[0]
if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: Hostname in-sync.")
self.logger.debug("Host %s: Hostname in-sync.", self.name)
else:
self.logger.info(
f"Host {self.name}: Hostname OUT of sync. "
f"Received value: {host['host']}"
"Host %s: Hostname OUT of sync. Received value: %s",
self.name,
host["host"],
)
self.updateZabbixHost(host=self.name)
# Execute check depending on wether the name is special or not
if self.use_visible_name:
if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: Visible name in-sync.")
self.logger.debug("Host %s: Visible name in-sync.", self.name)
else:
self.logger.info(
f"Host {self.name}: Visible name OUT of sync."
f" Received value: {host['name']}"
"Host %s: Visible name OUT of sync. Received value: %s",
self.name,
host["name"],
)
self.updateZabbixHost(name=self.visible_name)
# Check if the templates are in-sync
if not self.zbx_template_comparer(host["parentTemplates"]):
self.logger.info(f"Host {self.name}: Template(s) OUT of sync.")
self.logger.info("Host %s: Template(s) OUT of sync.", self.name)
# Prepare Templates for API parsing
templateids = []
for template in self.zbx_templates:
@ -703,38 +716,41 @@ class PhysicalDevice:
templates_clear=host["parentTemplates"], templates=templateids
)
else:
self.logger.debug(f"Host {self.name}: Template(s) in-sync.")
self.logger.debug("Host %s: Template(s) in-sync.", self.name)
# Check if Zabbix version is 6 or higher. Issue #93
group_dictname = "hostgroups"
if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups"
# Check if hostgroups match
if (sorted(host[group_dictname], key=itemgetter('groupid')) ==
sorted(self.group_ids, key=itemgetter('groupid'))):
self.logger.debug(f"Host {self.name}: Hostgroups in-sync.")
if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted(
self.group_ids, key=itemgetter("groupid")
):
self.logger.debug("Host %s: Hostgroups in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: Hostgroups OUT of sync.")
self.logger.info("Host %s: Hostgroups OUT of sync.", self.name)
self.updateZabbixHost(groups=self.group_ids)
if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: Status in-sync.")
self.logger.debug("Host %s: Status in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: Status OUT of sync.")
self.logger.info("Host %s: Status OUT of sync.", self.name)
self.updateZabbixHost(status=str(self.zabbix_state))
# Check if a proxy has been defined
if self.zbxproxy:
# Check if proxy or proxy group is defined
if (self.zbxproxy["idtype"] in host and
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]):
self.logger.debug(f"Host {self.name}: Proxy in-sync.")
if (
self.zbxproxy["idtype"] in host
and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]
):
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
self.logger.debug(f"Host {self.name}: Proxy in-sync.")
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Proxy does not match, update Zabbix
else:
self.logger.info(f"Host {self.name}: Proxy OUT of sync.")
self.logger.info("Host %s: Proxy OUT of sync.", self.name)
# Zabbix <= 6 patch
if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
@ -757,8 +773,8 @@ class PhysicalDevice:
if proxy_power and proxy_set:
# Zabbix <= 6 fix
self.logger.warning(
f"Host {self.name}: No proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix"
"Host %s: No proxy is configured in NetBox but is configured in Zabbix. Removing proxy config in Zabbix",
self.name,
)
if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0)
@ -772,59 +788,59 @@ class PhysicalDevice:
if proxy_set and not proxy_power:
# Display error message
self.logger.warning(
f"Host {self.name}: Is configured "
f"with proxy in Zabbix but not in NetBox. The"
" -p flag was ommited: no "
"changes have been made."
"Host %s: Is configured with proxy in Zabbix but not in NetBox. The -p flag was ommited: no changes have been made.",
self.name,
)
if not proxy_set:
self.logger.debug(f"Host {self.name}: Proxy in-sync.")
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Check host inventory mode
if str(host["inventory_mode"]) == str(self.inventory_mode):
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.")
self.logger.debug("Host %s: inventory_mode in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: inventory_mode OUT of sync.")
self.logger.info("Host %s: inventory_mode OUT of sync.", self.name)
self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
# Check host inventory mapping
if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: Inventory in-sync.")
self.logger.debug("Host %s: Inventory in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: Inventory OUT of sync.")
self.logger.info("Host %s: Inventory OUT of sync.", self.name)
self.updateZabbixHost(inventory=self.inventory)
# Check host usermacros
if config['usermacro_sync']:
if config["usermacro_sync"]:
# Make a full copy synce we dont want to lose the original value
# of secret type macros from Netbox
netbox_macros = deepcopy(self.usermacros)
# Set the sync bit
full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full")
full_sync_bit = bool(str(config["usermacro_sync"]).lower() == "full")
for macro in netbox_macros:
# If the Macro is a secret and full sync is NOT activated
if macro["type"] == str(1) and not full_sync_bit:
# Remove the value as the Zabbix api does not return the value key
# This is required when you want to do a diff between both lists
macro.pop("value")
# Sort all lists
def filter_with_macros(macro):
return macro["macro"]
host["macros"].sort(key=filter_with_macros)
netbox_macros.sort(key=filter_with_macros)
# Check if both lists are the same
if host["macros"] == netbox_macros:
self.logger.debug(f"Host {self.name}: Usermacros in-sync.")
self.logger.debug("Host %s: Usermacros in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: Usermacros OUT of sync.")
self.logger.info("Host %s: Usermacros OUT of sync.", self.name)
# Update Zabbix with NetBox usermacros
self.updateZabbixHost(macros=self.usermacros)
# Check host tags
if config['tag_sync']:
if config["tag_sync"]:
if remove_duplicates(host["tags"], sortkey="tag") == self.tags:
self.logger.debug(f"Host {self.name}: Tags in-sync.")
self.logger.debug("Host %s: Tags in-sync.", self.name)
else:
self.logger.info(f"Host {self.name}: Tags OUT of sync.")
self.logger.info("Host %s: Tags OUT of sync.", self.name)
self.updateZabbixHost(tags=self.tags)
# If only 1 interface has been found
@ -862,7 +878,7 @@ class PhysicalDevice:
updates[key] = item
if updates:
# If interface updates have been found: push to Zabbix
self.logger.info(f"Host {self.name}: Interface OUT of sync.")
self.logger.info("Host %s: Interface OUT of sync.", self.name)
if "type" in updates:
# Changing interface type not supported. Raise exception.
e = (
@ -876,26 +892,27 @@ class PhysicalDevice:
try:
# API call to Zabbix
self.zabbix.hostinterface.update(updates)
e = (f"Host {self.name}: Updated interface "
f"with data {sanatize_log_output(updates)}.")
self.logger.info(e)
self.create_journal_entry("info", e)
err_msg = (
f"Host {self.name}: Updated interface "
f"with data {sanatize_log_output(updates)}."
)
self.logger.info(err_msg)
self.create_journal_entry("info", err_msg)
except APIRequestError as e:
msg = f"Zabbix returned the following error: {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
else:
# If no updates are found, Zabbix interface is in-sync
e = f"Host {self.name}: Interface in-sync."
self.logger.debug(e)
self.logger.debug("Host %s: Interface in-sync.", self.name)
else:
e = (
err_msg = (
f"Host {self.name}: Has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual intervention required."
)
self.logger.error(e)
raise SyncInventoryError(e)
self.logger.error(err_msg)
raise SyncInventoryError(err_msg)
def create_journal_entry(self, severity, message):
"""
@ -906,7 +923,7 @@ class PhysicalDevice:
# Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning(
f"Value {severity} not valid for NB journal entries."
"Value %s not valid for NB journal entries.", severity
)
return False
journal = {
@ -917,12 +934,11 @@ class PhysicalDevice:
}
try:
self.nb_journals.create(journal)
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox")
self.logger.debug("Host %s: Created journal entry in NetBox", self.name)
return True
except NetboxRequestError as e:
self.logger.warning(
"Unable to create journal entry for "
f"{self.name}: NB returned {e}"
"Unable to create journal entry for %s: NB returned {e}", self.name
)
return False
return False
@ -947,8 +963,9 @@ class PhysicalDevice:
tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl)
self.logger.debug(
f"Host {self.name}: Template "
f"'{nb_tmpl['name']}' is present in Zabbix."
"Host %s: Template '%s' is present in Zabbix.",
self.name,
nb_tmpl["name"],
)
break
if (

View File

@ -94,8 +94,11 @@ class Hostgroup:
format_options["cluster"] = self.nb.cluster.name
format_options["cluster_type"] = self.nb.cluster.type.name
self.format_options = format_options
self.logger.debug(f"Host {self.name}: Resolved properties for use "
f"in hostgroups: {self.format_options}")
self.logger.debug(
"Host %s: Resolved properties for use in hostgroups: %s",
self.name,
self.format_options,
)
def set_nesting(
self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
@ -134,7 +137,9 @@ class Hostgroup:
if hostgroup_value:
hg_output.append(hostgroup_value)
else:
self.logger.info(f"Host {self.name}: Used field '{hg_item}' has no value.")
self.logger.info(
"Host %s: Used field '%s' has no value.", self.name, hg_item
)
# Check if the hostgroup is populated with at least one item.
if bool(hg_output):
return "/".join(hg_output)

View File

@ -3,6 +3,7 @@
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from modules.tools import field_mapper, remove_duplicates
@ -76,7 +77,7 @@ class ZabbixTags:
else:
tag["tag"] = tag_name
else:
self.logger.warning(f"Tag '{tag_name}' is not a valid tag name, skipping.")
self.logger.warning("Tag '%s' is not a valid tag name, skipping.", tag_name)
return False
if self.validate_value(tag_value):
@ -86,7 +87,7 @@ class ZabbixTags:
tag["value"] = tag_value
else:
self.logger.info(
f"Tag '{tag_name}' has an invalid value: '{tag_value}', skipping."
"Tag '%s' has an invalid value: '%s', skipping.", tag_name, tag_value
)
return False
return tag
@ -99,7 +100,7 @@ class ZabbixTags:
tags = []
# Parse the field mapper for tags
if self.tag_map:
self.logger.debug(f"Host {self.nb.name}: Starting tag mapper.")
self.logger.debug("Host %s: Starting tag mapper.", self.nb.name)
field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger)
for tag, value in field_tags.items():
t = self.render_tag(tag, value)
@ -131,5 +132,5 @@ class ZabbixTags:
tags.append(t)
tags = remove_duplicates(tags, sortkey="tag")
self.logger.debug(f"Host {self.name}: Resolved tags: {tags}")
self.logger.debug("Host %s: Resolved tags: %s", self.name, tags)
return tags

View File

@ -1,6 +1,8 @@
"""A collection of tools used by several classes"""
from modules.exceptions import HostgroupError
def convert_recordset(recordset):
"""Converts netbox RedcordSet to list of dicts."""
recordlist = []
@ -72,19 +74,22 @@ def field_mapper(host, mapper, nbdevice, logger):
elif not value:
# empty value should just be an empty string for API compatibility
logger.info(
f"Host {host}: NetBox lookup for "
f"'{nb_field}' returned an empty value."
"Host %s: NetBox lookup for '%s' returned an empty value.",
host,
nb_field,
)
data[zbx_field] = ""
else:
# Value is not a string or numeral, probably not what the user expected.
logger.info(
f"Host {host}: Lookup for '{nb_field}'"
" returned an unexpected type: it will be skipped."
"Host %s: Lookup for '%s' returned an unexpected type: it will be skipped.",
host,
nb_field,
)
logger.debug(
f"Host {host}: Field mapping complete. "
f"Mapped {len(list(filter(None, data.values())))} field(s)."
"Host %s: Field mapping complete. Mapped %s field(s).",
host,
len(list(filter(None, data.values()))),
)
return data
@ -101,7 +106,9 @@ def remove_duplicates(input_list, sortkey=None):
return output_list
def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None):
def verify_hg_format(
hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None
):
"""
Verifies hostgroup field format
"""
@ -109,44 +116,51 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log
device_cfs = []
if not vm_cfs:
vm_cfs = []
allowed_objects = {"dev": ["location",
"rack",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"platform",
"cluster"]
,"vm": ["cluster_type",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"cluster",
"device",
"platform"]
,"cfs": {"dev": [], "vm": []}
}
allowed_objects = {
"dev": [
"location",
"rack",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"platform",
"cluster",
],
"vm": [
"cluster_type",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"cluster",
"device",
"platform",
],
"cfs": {"dev": [], "vm": []},
}
for cf in device_cfs:
allowed_objects['cfs']['dev'].append(cf.name)
allowed_objects["cfs"]["dev"].append(cf.name)
for cf in vm_cfs:
allowed_objects['cfs']['vm'].append(cf.name)
allowed_objects["cfs"]["vm"].append(cf.name)
hg_objects = []
if isinstance(hg_format,list):
if isinstance(hg_format, list):
for f in hg_format:
hg_objects = hg_objects + f.split("/")
else:
hg_objects = hg_format.split("/")
hg_objects = sorted(set(hg_objects))
for hg_object in hg_objects:
if (hg_object not in allowed_objects[hg_type] and
hg_object not in allowed_objects['cfs'][hg_type]):
if (
hg_object not in allowed_objects[hg_type]
and hg_object not in allowed_objects["cfs"][hg_type]
):
e = (
f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and separate them with '/'."
@ -168,8 +182,7 @@ def sanatize_log_output(data):
if "macros" in data:
for macro in sanitized_data["macros"]:
# Check if macro is secret type
if not (macro["type"] == str(1) or
macro["type"] == 1):
if not (macro["type"] == str(1) or macro["type"] == 1):
continue
macro["value"] = "********"
# Check for interface data

View File

@ -3,6 +3,7 @@
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from re import match
@ -57,8 +58,11 @@ class ZabbixUsermacros:
macro["macro"] = str(macro_name)
if isinstance(macro_properties, dict):
if not "value" in macro_properties:
self.logger.info(f"Host {self.name}: Usermacro {macro_name} has "
"no value in Netbox, skipping.")
self.logger.info(
"Host %s: Usermacro %s has no value in Netbox, skipping.",
self.name,
macro_name,
)
return False
macro["value"] = macro_properties["value"]
@ -83,12 +87,17 @@ class ZabbixUsermacros:
macro["description"] = ""
else:
self.logger.info(f"Host {self.name}: Usermacro {macro_name} "
"has no value, skipping.")
self.logger.info(
"Host %s: Usermacro %s has no value, skipping.",
self.name,
macro_name,
)
return False
else:
self.logger.warning(
f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping."
"Host %s: Usermacro %s is not a valid usermacro name, skipping.",
self.name,
macro_name,
)
return False
return macro
@ -98,10 +107,10 @@ class ZabbixUsermacros:
Generate full set of Usermacros
"""
macros = []
data={}
data = {}
# Parse the field mapper for usermacros
if self.usermacro_map:
self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper.")
self.logger.debug("Host %s: Starting usermacro mapper.", self.nb.name)
field_macros = field_mapper(
self.nb.name, self.usermacro_map, self.nb, self.logger
)
@ -120,6 +129,8 @@ class ZabbixUsermacros:
m = self.render_macro(macro, properties)
if m:
macros.append(m)
data={'macros': macros}
self.logger.debug(f"Host {self.name}: Resolved macros: {sanatize_log_output(data)}")
data = {"macros": macros}
self.logger.debug(
"Host %s: Resolved macros: %s", self.name, sanatize_log_output(data)
)
return macros

View File

@ -2,6 +2,7 @@
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation
"""NetBox to Zabbix sync script."""
import argparse
import logging
import ssl
@ -67,15 +68,15 @@ def main(arguments):
try:
# Get NetBox version
nb_version = netbox.version
logger.debug(f"NetBox version is {nb_version}.")
logger.debug("NetBox version is %s.", nb_version)
except RequestsConnectionError:
logger.error(
f"Unable to connect to NetBox with URL {netbox_host}."
" Please check the URL and status of NetBox."
"Unable to connect to NetBox with URL %s. Please check the URL and status of NetBox.",
netbox_host,
)
sys.exit(1)
except NBRequestError as e:
logger.error(f"NetBox error: {e}")
logger.error("NetBox error: %s", e)
sys.exit(1)
# Check if the provided Hostgroup layout is valid
device_cfs = []
@ -83,14 +84,18 @@ def main(arguments):
device_cfs = list(
netbox.extras.custom_fields.filter(type="text", content_types="dcim.device")
)
verify_hg_format(config["hostgroup_format"],
device_cfs=device_cfs, hg_type="dev", logger=logger)
verify_hg_format(
config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger
)
if config["sync_vms"]:
vm_cfs = list(
netbox.extras.custom_fields.filter(type="text",
content_types="virtualization.virtualmachine")
netbox.extras.custom_fields.filter(
type="text", content_types="virtualization.virtualmachine"
)
)
verify_hg_format(
config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger
)
verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger)
# Set Zabbix API
try:
ssl_ctx = ssl.create_default_context()
@ -120,7 +125,8 @@ def main(arguments):
netbox_vms = []
if config["sync_vms"]:
netbox_vms = list(
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]))
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])
)
netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all()))
netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries
@ -141,15 +147,22 @@ def main(arguments):
# Go through all NetBox devices
for nb_vm in netbox_vms:
try:
vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version,
config["create_journal"], logger)
logger.debug(f"Host {vm.name}: Started operations on VM.")
vm = VirtualMachine(
nb_vm,
zabbix,
netbox_journals,
nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on VM.", vm.name)
vm.set_vm_template()
# Check if a valid template has been found for this VM.
if not vm.zbx_template_names:
continue
vm.set_hostgroup(config["vm_hostgroup_format"],
netbox_site_groups, netbox_regions)
vm.set_hostgroup(
config["vm_hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM.
if not vm.hostgroups:
continue
@ -162,13 +175,12 @@ def main(arguments):
# Delete device from Zabbix
# and remove hostID from NetBox.
vm.cleanup()
logger.info(f"VM {vm.name}: cleanup complete")
logger.info("VM %s: cleanup complete", vm.name)
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"VM {vm.name}: Skipping since this VM is "
f"not in the active state."
"VM %s: Skipping since this VM is not in the active state.", vm.name
)
continue
# Check if the VM is in the disabled state
@ -200,20 +212,31 @@ def main(arguments):
for nb_device in netbox_devices:
try:
# Set device instance set data such as hostgroup and template information.
device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version,
config["create_journal"], logger)
logger.debug(f"Host {device.name}: Started operations on device.")
device.set_template(config["templates_config_context"],
config["templates_config_context_overrule"])
device = PhysicalDevice(
nb_device,
zabbix,
netbox_journals,
nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on device.", device.name)
device.set_template(
config["templates_config_context"],
config["templates_config_context_overrule"],
)
# Check if a valid template has been found for this VM.
if not device.zbx_template_names:
continue
device.set_hostgroup(
config["hostgroup_format"], netbox_site_groups, netbox_regions)
config["hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM.
if not device.hostgroups:
logger.warning(f"Host {device.name}: Host has no valid "
f"hostgroups, Skipping this host...")
logger.warning(
"Host %s: Host has no valid hostgroups, Skipping this host...",
device.name,
)
continue
device.set_inventory(nb_device)
device.set_usermacros()
@ -223,16 +246,16 @@ def main(arguments):
if device.isCluster() and config["clustering"]:
# Check if device is primary or secondary
if device.promoteMasterDevice():
e = f"Device {device.name}: is " f"part of cluster and primary."
logger.info(e)
logger.info(
"Device %s: is part of cluster and primary.", device.name
)
else:
# Device is secondary in cluster.
# Don't continue with this device.
e = (
f"Device {device.name}: Is part of cluster "
f"but not primary. Skipping this host..."
logger.info(
"Device %s: Is part of cluster but not primary. Skipping this host...",
device.name,
)
logger.info(e)
continue
# Checks if device is in cleanup state
if device.status in config["zabbix_device_removal"]:
@ -240,13 +263,13 @@ def main(arguments):
# Delete device from Zabbix
# and remove hostID from NetBox.
device.cleanup()
logger.info(f"Device {device.name}: cleanup complete")
logger.info("Device %s: cleanup complete", device.name)
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"Device {device.name}: Skipping since this device is "
f"not in the active state."
"Device %s: Skipping since this device is not in the active state.",
device.name,
)
continue
# Check if the device is in the disabled state