🚨 Method naming convention enforcement and formatting improvements

This commit is contained in:
Wouter de Bruijn 2025-06-17 10:33:47 +02:00
parent e1f3911046
commit 8f658d3bf8
No known key found for this signature in database
GPG Key ID: AC71F96733B92BFA
9 changed files with 184 additions and 360 deletions

View File

@ -21,8 +21,13 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pylint
pip install ruff mypy bandit
pip install -r requirements.txt
- name: Analysing the code with pylint
run: |
pylint --module-naming-style=any modules/* netbox_zabbix_sync.py
- name: Run Ruff (Lint) (Replacement for flake8, isort)
run: ruff check netbox_zabbix_sync.py modules/* tests/*
- name: Run Ruff (Format Check - no changes) (Replacement for black)
run: ruff format --check .
- name: Run Mypy
run: mypy netbox_zabbix_sync.py modules/*
- name: Run Bandit
run: bandit -r . -c pyproject.toml # Or relevant config

View File

@ -34,9 +34,7 @@ class PhysicalDevice:
INPUT: (NetBox device class, ZabbixAPI class, journal flag, NB journal class)
"""
def __init__(
self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None
):
def __init__(self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None):
self.nb = nb
self.id = nb.id
self.name = nb.name
@ -60,7 +58,7 @@ class PhysicalDevice:
self.usermacros = []
self.tags = {}
self.logger = logger if logger else getLogger(__name__)
self._setBasics()
self._set_basics()
def __repr__(self):
return self.name
@ -80,7 +78,7 @@ class PhysicalDevice:
"""Use device host tag maps"""
return config["device_tag_map"]
def _setBasics(self):
def _set_basics(self):
"""
Sets basic information like IP address.
"""
@ -104,17 +102,11 @@ class PhysicalDevice:
# Validate hostname format.
odd_character_list = ["ä", "ö", "ü", "Ä", "Ö", "Ü", "ß"]
self.use_visible_name = False
if any(letter in self.name for letter in odd_character_list) or bool(
search("[\u0400-\u04ff]", self.name)
):
if any(letter in self.name for letter in odd_character_list) or bool(search("[\u0400-\u04ff]", self.name)):
self.name = f"NETBOX_ID{self.id}"
self.visible_name = self.nb.name
self.use_visible_name = True
self.logger.info(
f"Host {self.visible_name} contains special characters. "
f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix."
)
self.logger.info(f"Host {self.visible_name} contains special characters. Using {self.name} as name for the NetBox object and using {self.visible_name} as visible name in Zabbix.")
else:
pass
@ -170,27 +162,17 @@ class PhysicalDevice:
# Set value to template
return [device_type_cfs[config["template_cf"]]]
# Custom field not found, return error
e = (
f"Custom field {config['template_cf']} not "
f"found for {self.nb.device_type.manufacturer.name}"
f" - {self.nb.device_type.display}."
)
e = f"Custom field {config['template_cf']} not found for {self.nb.device_type.manufacturer.name} - {self.nb.device_type.display}."
self.logger.warning(e)
raise TemplateError(e)
def get_templates_context(self):
"""Get Zabbix templates from the device context"""
if "zabbix" not in self.config_context:
e = (
f"Host {self.name}: Key 'zabbix' not found in config "
"context for template lookup"
)
e = f"Host {self.name}: Key 'zabbix' not found in config context for template lookup"
raise TemplateError(e)
if "templates" not in self.config_context["zabbix"]:
e = (
f"Host {self.name}: Key 'templates' not found in config "
"context 'zabbix' for template lookup"
)
e = f"Host {self.name}: Key 'templates' not found in config context 'zabbix' for template lookup"
raise TemplateError(e)
# Check if format is list or string.
if isinstance(self.config_context["zabbix"]["templates"], str):
@ -202,75 +184,56 @@ class PhysicalDevice:
# Set inventory mode. Default is disabled (see class init function).
if config["inventory_mode"] == "disabled":
if config["inventory_sync"]:
self.logger.error(
f"Host {self.name}: Unable to map NetBox inventory to Zabbix. "
"Inventory sync is enabled in "
"config but inventory mode is disabled."
)
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. Inventory sync is enabled in config but inventory mode is disabled.")
return True
if config["inventory_mode"] == "manual":
self.inventory_mode = 0
elif config["inventory_mode"] == "automatic":
self.inventory_mode = 1
else:
self.logger.error(
f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {config['inventory_mode']}"
)
self.logger.error(f"Host {self.name}: Specified value for inventory mode in config is not valid. Got value {config['inventory_mode']}")
return False
self.inventory = {}
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper")
self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger
)
self.inventory = field_mapper(self.name, self._inventory_map(), nbdevice, self.logger)
return True
def isCluster(self):
def is_cluster(self):
"""
Checks if device is part of cluster.
"""
return bool(self.nb.virtual_chassis)
def getClusterMaster(self):
def get_cluster_master(self):
"""
Returns chassis master ID.
"""
if not self.isCluster():
e = (
f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster."
)
if not self.is_cluster():
e = f"Unable to proces {self.name} for cluster calculation: not part of a cluster."
self.logger.warning(e)
raise SyncInventoryError(e)
if not self.nb.virtual_chassis.master:
e = (
f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason."
)
e = f"{self.name} is part of a NetBox virtual chassis which does not have a master configured. Skipping for this reason."
self.logger.error(e)
raise SyncInventoryError(e)
return self.nb.virtual_chassis.master.id
def promoteMasterDevice(self):
def promote_master_device(self):
"""
If device is Primary in cluster,
promote device name to the cluster name.
Returns True if succesfull, returns False if device is secondary.
"""
masterid = self.getClusterMaster()
masterid = self.get_cluster_master()
if masterid == self.id:
self.logger.debug(
f"Host {self.name} is primary cluster member. "
f"Modifying hostname from {self.name} to "
+ f"{self.nb.virtual_chassis.name}."
)
self.logger.debug(f"Host {self.name} is primary cluster member. Modifying hostname from {self.name} to " + f"{self.nb.virtual_chassis.name}.")
self.name = self.nb.virtual_chassis.name
return True
self.logger.debug(f"Host {self.name} is non-primary cluster member.")
return False
def zbxTemplatePrepper(self, templates):
def zbx_template_prepper(self, templates):
"""
Returns Zabbix template IDs
INPUT: list of templates from Zabbix
@ -303,14 +266,11 @@ class PhysicalDevice:
self.logger.debug(e)
# Return error should the template not be found in Zabbix
if not template_match:
e = (
f"Unable to find template {nb_template} "
f"for host {self.name} in Zabbix. Skipping host..."
)
e = f"Unable to find template {nb_template} for host {self.name} in Zabbix. Skipping host..."
self.logger.warning(e)
raise SyncInventoryError(e)
def setZabbixGroupID(self, groups):
def set_zabbix_group_id(self, groups):
"""
Sets Zabbix group ID as instance variable
INPUT: list of hostgroups
@ -321,10 +281,7 @@ class PhysicalDevice:
for group in groups:
if group["name"] == hg:
self.group_ids.append({"groupid": group["groupid"]})
e = (
f"Host {self.name}: matched group "
f'"{group["name"]}" (ID:{group["groupid"]})'
)
e = f'Host {self.name}: matched group "{group["name"]}" (ID:{group["groupid"]})'
self.logger.debug(e)
if len(self.group_ids) == len(self.hostgroups):
return True
@ -338,13 +295,8 @@ class PhysicalDevice:
if self.zabbix_id:
try:
# Check if the Zabbix host exists in Zabbix
zbx_host = bool(
self.zabbix.host.get(filter={"hostid": self.zabbix_id}, output=[])
)
e = (
f"Host {self.name}: was already deleted from Zabbix."
" Removed link in NetBox."
)
zbx_host = bool(self.zabbix.host.get(filter={"hostid": self.zabbix_id}, output=[]))
e = f"Host {self.name}: was already deleted from Zabbix. Removed link in NetBox."
if zbx_host:
# Delete host should it exists
self.zabbix.host.delete(self.zabbix_id)
@ -363,7 +315,7 @@ class PhysicalDevice:
self.nb.custom_fields[config["device_cf"]] = None
self.nb.save()
def _zabbixHostnameExists(self):
def _zabbix_hostname_exists(self):
"""
Checks if hostname exists in Zabbix.
"""
@ -375,7 +327,7 @@ class PhysicalDevice:
host = self.zabbix.host.get(filter=zbx_filter, output=[])
return bool(host)
def setInterfaceDetails(self):
def set_interface_details(self):
"""
Checks interface parameters from NetBox and
creates a model for the interface to be used in Zabbix.
@ -435,7 +387,7 @@ class PhysicalDevice:
self.tags = tags.generate()
return True
def setProxy(self, proxy_list):
def set_proxy(self, proxy_list):
"""
Sets proxy or proxy group if this
value has been defined in config context
@ -445,10 +397,7 @@ class PhysicalDevice:
# check if the key Zabbix is defined in the config context
if "zabbix" not in self.nb.config_context:
return False
if (
"proxy" in self.nb.config_context["zabbix"]
and not self.nb.config_context["zabbix"]["proxy"]
):
if "proxy" in self.nb.config_context["zabbix"] and not self.nb.config_context["zabbix"]["proxy"]:
return False
# Proxy group takes priority over a proxy due
# to it being HA and therefore being more reliable
@ -468,17 +417,13 @@ class PhysicalDevice:
continue
# If the proxy name matches
if proxy["name"] == proxy_name:
self.logger.debug(
f"Host {self.name}: using {proxy['type']} {proxy_name}"
)
self.logger.debug(f"Host {self.name}: using {proxy['type']} {proxy_name}")
self.zbxproxy = proxy
return True
self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}"
)
self.logger.warning(f"Host {self.name}: unable to find proxy {proxy_name}")
return False
def createInZabbix(
def create_in_zabbix(
self,
groups,
templates,
@ -489,24 +434,21 @@ class PhysicalDevice:
Creates Zabbix host object with parameters from NetBox object.
"""
# Check if hostname is already present in Zabbix
if not self._zabbixHostnameExists():
if not self._zabbix_hostname_exists():
# Set group and template ID's for host
if not self.setZabbixGroupID(groups):
e = (
f"Unable to find group '{self.hostgroup}' "
f"for host {self.name} in Zabbix."
)
if not self.set_zabbix_group_id(groups):
e = f"Unable to find group '{self.hostgroup}' for host {self.name} in Zabbix."
self.logger.warning(e)
raise SyncInventoryError(e)
self.zbxTemplatePrepper(templates)
self.zbx_template_prepper(templates)
templateids = []
for template in self.zbx_templates:
templateids.append({"templateid": template["templateid"]})
# Set interface, group and template configuration
interfaces = self.setInterfaceDetails()
interfaces = self.set_interface_details()
groups = self.group_ids
# Set Zabbix proxy if defined
self.setProxy(proxies)
self.set_proxy(proxies)
# Set basic data for host creation
create_data = {
"host": self.name,
@ -546,11 +488,9 @@ class PhysicalDevice:
self.logger.info(msg)
self.create_journal_entry("success", msg)
else:
self.logger.error(
f"Host {self.name}: Unable to add to Zabbix. Host already present."
)
self.logger.error(f"Host {self.name}: Unable to add to Zabbix. Host already present.")
def createZabbixHostgroup(self, hostgroups):
def create_zabbix_hostgroup(self, hostgroups):
"""
Creates Zabbix host group based on hostgroup format.
Creates multiple when using a nested format.
@ -560,7 +500,7 @@ class PhysicalDevice:
for hostgroup in self.hostgroups:
for pos in range(len(hostgroup.split("/"))):
zabbix_hg = hostgroup.rsplit("/", pos)[0]
if self.lookupZabbixHostgroup(hostgroups, zabbix_hg):
if self.lookup_zabbix_hostgroup(hostgroups, zabbix_hg):
# Hostgroup already exists
continue
# Create new group
@ -570,16 +510,14 @@ class PhysicalDevice:
e = f"Hostgroup '{zabbix_hg}': created in Zabbix."
self.logger.info(e)
# Add group to final data
final_data.append(
{"groupid": groupid["groupids"][0], "name": zabbix_hg}
)
final_data.append({"groupid": groupid["groupids"][0], "name": zabbix_hg})
except APIRequestError as e:
msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
return final_data
def lookupZabbixHostgroup(self, group_list, lookup_group):
def lookup_zabbix_hostgroup(self, group_list, lookup_group):
"""
Function to check if a hostgroup
exists in a list of Zabbix hostgroups
@ -591,7 +529,7 @@ class PhysicalDevice:
return True
return False
def updateZabbixHost(self, **kwargs):
def update_zabbix_host(self, **kwargs):
"""
Updates Zabbix host with given parameters.
INPUT: Key word arguments for Zabbix host object.
@ -599,48 +537,38 @@ class PhysicalDevice:
try:
self.zabbix.host.update(hostid=self.zabbix_id, **kwargs)
except APIRequestError as e:
e = (
f"Host {self.name}: Unable to update. "
f"Zabbix returned the following error: {str(e)}."
)
e = f"Host {self.name}: Unable to update. Zabbix returned the following error: {str(e)}."
self.logger.error(e)
raise SyncExternalError(e) from None
self.logger.info(
f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}."
)
self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.")
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck(
self, groups, templates, proxies, proxy_power, create_hostgroups
):
def consistency_check(self, groups, templates, proxies, proxy_power, create_hostgroups):
# pylint: disable=too-many-branches, too-many-statements
"""
Checks if Zabbix object is still valid with NetBox parameters.
"""
# If group is found or if the hostgroup is nested
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if not self.set_zabbix_group_id(groups): # or len(self.hostgroups.split("/")) > 1:
if create_hostgroups:
# Script is allowed to create a new hostgroup
new_groups = self.createZabbixHostgroup(groups)
new_groups = self.create_zabbix_hostgroup(groups)
for group in new_groups:
# Add all new groups to the list of groups
groups.append(group)
# check if the initial group was not already found (and this is a nested folder check)
if not self.group_ids:
# Function returns true / false but also sets GroupID
if not self.setZabbixGroupID(groups) and not create_hostgroups:
e = (
f"Host {self.name}: different hostgroup is required but "
"unable to create hostgroup without generation permission."
)
if not self.set_zabbix_group_id(groups) and not create_hostgroups:
e = f"Host {self.name}: different hostgroup is required but unable to create hostgroup without generation permission."
self.logger.warning(e)
raise SyncInventoryError(e)
# if self.group_ids:
# self.group_ids.append(self.pri_group_id)
# Prepare templates and proxy config
self.zbxTemplatePrepper(templates)
self.setProxy(proxies)
self.zbx_template_prepper(templates)
self.set_proxy(proxies)
# Get host object from Zabbix
host = self.zabbix.host.get(
filter={"hostid": self.zabbix_id},
@ -653,40 +581,27 @@ class PhysicalDevice:
selectTags=["tag", "value"],
)
if len(host) > 1:
e = (
f"Got {len(host)} results for Zabbix hosts "
f"with ID {self.zabbix_id} - hostname {self.name}."
)
e = f"Got {len(host)} results for Zabbix hosts with ID {self.zabbix_id} - hostname {self.name}."
self.logger.error(e)
raise SyncInventoryError(e)
if len(host) == 0:
e = (
f"Host {self.name}: No Zabbix host found. "
f"This is likely the result of a deleted Zabbix host "
f"without zeroing the ID field in NetBox."
)
e = f"Host {self.name}: No Zabbix host found. This is likely the result of a deleted Zabbix host without zeroing the ID field in NetBox."
self.logger.error(e)
raise SyncInventoryError(e)
host = host[0]
if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: hostname in-sync.")
else:
self.logger.warning(
f"Host {self.name}: hostname OUT of sync. "
f"Received value: {host['host']}"
)
self.updateZabbixHost(host=self.name)
self.logger.warning(f"Host {self.name}: hostname OUT of sync. Received value: {host['host']}")
self.update_zabbix_host(host=self.name)
# Execute check depending on wether the name is special or not
if self.use_visible_name:
if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: visible name in-sync.")
else:
self.logger.warning(
f"Host {self.name}: visible name OUT of sync."
f" Received value: {host['name']}"
)
self.updateZabbixHost(name=self.visible_name)
self.logger.warning(f"Host {self.name}: visible name OUT of sync. Received value: {host['name']}")
self.update_zabbix_host(name=self.visible_name)
# Check if the templates are in-sync
if not self.zbx_template_comparer(host["parentTemplates"]):
@ -696,9 +611,7 @@ class PhysicalDevice:
for template in self.zbx_templates:
templateids.append({"templateid": template["templateid"]})
# Update Zabbix with NB templates and clear any old / lost templates
self.updateZabbixHost(
templates_clear=host["parentTemplates"], templates=templateids
)
self.update_zabbix_host(templates_clear=host["parentTemplates"], templates=templateids)
else:
self.logger.debug(f"Host {self.name}: template(s) in-sync.")
@ -707,27 +620,22 @@ class PhysicalDevice:
if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups"
# Check if hostgroups match
if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted(
self.group_ids, key=itemgetter("groupid")
):
if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted(self.group_ids, key=itemgetter("groupid")):
self.logger.debug(f"Host {self.name}: hostgroups in-sync.")
else:
self.logger.warning(f"Host {self.name}: hostgroups OUT of sync.")
self.updateZabbixHost(groups=self.group_ids)
self.update_zabbix_host(groups=self.group_ids)
if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: status in-sync.")
else:
self.logger.warning(f"Host {self.name}: status OUT of sync.")
self.updateZabbixHost(status=str(self.zabbix_state))
self.update_zabbix_host(status=str(self.zabbix_state))
# Check if a proxy has been defined
if self.zbxproxy:
# Check if proxy or proxy group is defined
if (
self.zbxproxy["idtype"] in host
and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]
):
if self.zbxproxy["idtype"] in host and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]:
self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
@ -737,7 +645,7 @@ class PhysicalDevice:
self.logger.warning(f"Host {self.name}: proxy OUT of sync.")
# Zabbix <= 6 patch
if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
self.update_zabbix_host(proxy_hostid=self.zbxproxy["id"])
# Zabbix 7+
else:
# Prepare data structure for updating either proxy or group
@ -745,7 +653,7 @@ class PhysicalDevice:
self.zbxproxy["idtype"]: self.zbxproxy["id"],
"monitored_by": self.zbxproxy["monitored_by"],
}
self.updateZabbixHost(**update_data)
self.update_zabbix_host(**update_data)
else:
# No proxy is defined in NetBox
proxy_set = False
@ -756,27 +664,19 @@ class PhysicalDevice:
proxy_set = True
if proxy_power and proxy_set:
# Zabbix <= 6 fix
self.logger.warning(
f"Host {self.name}: no proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix"
)
self.logger.warning(f"Host {self.name}: no proxy is configured in NetBox but is configured in Zabbix. Removing proxy config in Zabbix")
if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0)
self.update_zabbix_host(proxy_hostid=0)
# Zabbix 7 proxy
elif "proxyid" in host and bool(host["proxyid"]):
self.updateZabbixHost(proxyid=0, monitored_by=0)
self.update_zabbix_host(proxyid=0, monitored_by=0)
# Zabbix 7 proxy group
elif "proxy_groupid" in host and bool(host["proxy_groupid"]):
self.updateZabbixHost(proxy_groupid=0, monitored_by=0)
self.update_zabbix_host(proxy_groupid=0, monitored_by=0)
# Checks if a proxy has been defined in Zabbix and if proxy_power config has been set
if proxy_set and not proxy_power:
# Display error message
self.logger.error(
f"Host {self.name} is configured "
f"with proxy in Zabbix but not in NetBox. The"
" -p flag was ommited: no "
"changes have been made."
)
self.logger.error(f"Host {self.name} is configured with proxy in Zabbix but not in NetBox. The -p flag was ommited: no changes have been made.")
if not proxy_set:
self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Check host inventory mode
@ -784,14 +684,14 @@ class PhysicalDevice:
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.")
else:
self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.")
self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
self.update_zabbix_host(inventory_mode=str(self.inventory_mode))
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
# Check host inventory mapping
if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: inventory in-sync.")
else:
self.logger.warning(f"Host {self.name}: inventory OUT of sync.")
self.updateZabbixHost(inventory=self.inventory)
self.update_zabbix_host(inventory=self.inventory)
# Check host usermacros
if config["usermacro_sync"]:
@ -819,7 +719,7 @@ class PhysicalDevice:
else:
self.logger.warning(f"Host {self.name}: usermacros OUT of sync.")
# Update Zabbix with NetBox usermacros
self.updateZabbixHost(macros=self.usermacros)
self.update_zabbix_host(macros=self.usermacros)
# Check host tags
if config["tag_sync"]:
@ -827,14 +727,14 @@ class PhysicalDevice:
self.logger.debug(f"Host {self.name}: tags in-sync.")
else:
self.logger.warning(f"Host {self.name}: tags OUT of sync.")
self.updateZabbixHost(tags=self.tags)
self.update_zabbix_host(tags=self.tags)
# If only 1 interface has been found
# pylint: disable=too-many-nested-blocks
if len(host["interfaces"]) == 1:
updates = {}
# Go through each key / item and check if it matches Zabbix
for key, item in self.setInterfaceDetails()[0].items():
for key, item in self.set_interface_details()[0].items():
# Check if NetBox value is found in Zabbix
if key in host["interfaces"][0]:
# If SNMP is used, go through nested dict
@ -867,10 +767,7 @@ class PhysicalDevice:
self.logger.warning(f"Host {self.name}: Interface OUT of sync.")
if "type" in updates:
# Changing interface type not supported. Raise exception.
e = (
f"Host {self.name}: changing interface type to "
f"{str(updates['type'])} is not supported."
)
e = f"Host {self.name}: changing interface type to {str(updates['type'])} is not supported."
self.logger.error(e)
raise InterfaceConfigError(e)
# Set interfaceID for Zabbix config
@ -878,10 +775,7 @@ class PhysicalDevice:
try:
# API call to Zabbix
self.zabbix.hostinterface.update(updates)
e = (
f"Host {self.name}: updated interface "
f"with data {sanatize_log_output(updates)}."
)
e = f"Host {self.name}: updated interface with data {sanatize_log_output(updates)}."
self.logger.info(e)
self.create_journal_entry("info", e)
except APIRequestError as e:
@ -892,11 +786,7 @@ class PhysicalDevice:
# If no updates are found, Zabbix interface is in-sync
self.logger.debug("Host %s: interface in-sync.", self.name)
else:
error_message = (
f"Host {self.name} has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual intervention required."
)
error_message = f"Host {self.name} has unsupported interface configuration. Host has total of {len(host['interfaces'])} interfaces. Manual intervention required."
self.logger.error(error_message)
raise SyncInventoryError(error_message)
@ -908,9 +798,7 @@ class PhysicalDevice:
if self.journal:
# Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning(
f"Value {severity} not valid for NB journal entries."
)
self.logger.warning(f"Value {severity} not valid for NB journal entries.")
return False
journal = {
"assigned_object_type": "dcim.device",
@ -923,9 +811,7 @@ class PhysicalDevice:
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox")
return True
except NetboxRequestError as e:
self.logger.warning(
f"Unable to create journal entry for {self.name}: NB returned {e}"
)
self.logger.warning(f"Unable to create journal entry for {self.name}: NB returned {e}")
return False
return False
@ -948,15 +834,9 @@ class PhysicalDevice:
# and add this NB template to the list of successfull templates
tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl)
self.logger.debug(
f"Host {self.name}: template "
f"{nb_tmpl['name']} is present in Zabbix."
)
self.logger.debug(f"Host {self.name}: template {nb_tmpl['name']} is present in Zabbix.")
break
if (
len(succesfull_templates) == len(self.zbx_templates)
and len(tmpls_from_zabbix) == 0
):
if len(succesfull_templates) == len(self.zbx_templates) and len(tmpls_from_zabbix) == 0:
# All of the NetBox templates have been confirmed as successfull
# and the ZBX template list is empty. This means that
# all of the templates match.

View File

@ -32,9 +32,7 @@ class Hostgroup:
self.name = self.nb.name
self.nb_version = version
# Used for nested data objects
self.set_nesting(
nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
)
self.set_nesting(nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions)
self._set_format_options()
def __str__(self):
@ -63,28 +61,18 @@ class Hostgroup:
format_options["site_group"] = None
if self.nb.site:
if self.nb.site.region:
format_options["region"] = self.generate_parents(
"region", str(self.nb.site.region)
)
format_options["region"] = self.generate_parents("region", str(self.nb.site.region))
if self.nb.site.group:
format_options["site_group"] = self.generate_parents(
"site_group", str(self.nb.site.group)
)
format_options["site_group"] = self.generate_parents("site_group", str(self.nb.site.group))
format_options["role"] = role
format_options["site"] = self.nb.site.name if self.nb.site else None
format_options["tenant"] = str(self.nb.tenant) if self.nb.tenant else None
format_options["tenant_group"] = (
str(self.nb.tenant.group) if self.nb.tenant else None
)
format_options["platform"] = (
self.nb.platform.name if self.nb.platform else None
)
format_options["tenant_group"] = str(self.nb.tenant.group) if self.nb.tenant else None
format_options["platform"] = self.nb.platform.name if self.nb.platform else None
# Variables only applicable for devices
if self.type == "dev":
format_options["manufacturer"] = self.nb.device_type.manufacturer.name
format_options["location"] = (
str(self.nb.location) if self.nb.location else None
)
format_options["location"] = str(self.nb.location) if self.nb.location else None
format_options["rack"] = self.nb.rack.name if self.nb.rack else None
# Variables only applicable for VM's
if self.type == "vm":
@ -94,9 +82,7 @@ class Hostgroup:
format_options["cluster_type"] = self.nb.cluster.type.name
self.format_options = format_options
def set_nesting(
self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
):
def set_nesting(self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions):
"""Set nesting options for this Hostgroup"""
self.nested_objects = {
"site_group": {"flag": nested_sitegroup_flag, "data": nb_groups},
@ -107,9 +93,7 @@ class Hostgroup:
"""Generate hostgroup based on a provided format"""
# Set format to default in case its not specified
if not hg_format:
hg_format = (
"site/manufacturer/role" if self.type == "dev" else "cluster/role"
)
hg_format = "site/manufacturer/role" if self.type == "dev" else "cluster/role"
# Split all given names
hg_output = []
hg_items = hg_format.split("/")
@ -120,10 +104,7 @@ class Hostgroup:
cf_data = self.custom_field_lookup(hg_item)
# CF does not exist
if not cf_data["result"]:
msg = (
f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported."
)
msg = f"Unable to generate hostgroup for host {self.name}. Item type {hg_item} not supported."
self.logger.error(msg)
raise HostgroupError(msg)
# CF data is populated
@ -138,20 +119,18 @@ class Hostgroup:
# Check if the hostgroup is populated with at least one item.
if bool(hg_output):
return "/".join(hg_output)
msg = (
f"Unable to generate hostgroup for host {self.name}."
" Not enough valid items. This is most likely"
" due to the use of custom fields that are empty"
" or an invalid hostgroup format."
)
msg = f"Unable to generate hostgroup for host {self.name}. Not enough valid items. This is most likely due to the use of custom fields that are empty or an invalid hostgroup format."
self.logger.error(msg)
raise HostgroupError(msg)
def list_formatoptions(self):
def list_format_options(self):
"""
Function to easily troubleshoot which values
are generated for a specific device or VM.
"""
# TODO @TheNetworkGuy: Change print statements to appropriate logging calls
print(f"The following options are available for host {self.name}")
for option_type, value in self.format_options.items():
if value is not None:
@ -188,9 +167,7 @@ class Hostgroup:
return child_object
# If the nested flag is True, perform parent calculation
if self.nested_objects[nest_type]["flag"]:
final_nested_object = build_path(
child_object, self.nested_objects[nest_type]["data"]
)
final_nested_object = build_path(child_object, self.nested_objects[nest_type]["data"])
return "/".join(final_nested_object)
# Nesting is not allowed for this object. Return child_object
return child_object

View File

@ -60,7 +60,7 @@ class VirtualMachine(PhysicalDevice):
self.logger.warning(e)
return True
def setInterfaceDetails(self): # pylint: disable=invalid-name
def set_interface_details(self): # pylint: disable=invalid-name
"""
Overwrites device function to select an agent interface type by default
Agent type interfaces are more likely to be used with VMs then SNMP

View File

@ -6,8 +6,8 @@
import argparse
import logging
import ssl
from os import environ
import sys
from os import environ
from pynetbox import api
from pynetbox.core.query import RequestError as NBRequestError
@ -72,10 +72,7 @@ def main(arguments):
nb_version = netbox.version
logger.debug(f"NetBox version is {nb_version}.")
except RequestsConnectionError:
logger.error(
f"Unable to connect to NetBox with URL {netbox_host}."
" Please check the URL and status of NetBox."
)
logger.error(f"Unable to connect to NetBox with URL {netbox_host}. Please check the URL and status of NetBox.")
sys.exit(1)
except NBRequestError as e:
logger.error(f"NetBox error: {e}")
@ -83,21 +80,11 @@ def main(arguments):
# Check if the provided Hostgroup layout is valid
device_cfs = []
vm_cfs = []
device_cfs = list(
netbox.extras.custom_fields.filter(type="text", content_types="dcim.device")
)
verify_hg_format(
config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger
)
device_cfs = list(netbox.extras.custom_fields.filter(type="text", content_types="dcim.device"))
verify_hg_format(config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger)
if config["sync_vms"]:
vm_cfs = list(
netbox.extras.custom_fields.filter(
type="text", content_types="virtualization.virtualmachine"
)
)
verify_hg_format(
config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger
)
vm_cfs = list(netbox.extras.custom_fields.filter(type="text", content_types="virtualization.virtualmachine"))
verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger)
# Set Zabbix API
try:
ssl_ctx = ssl.create_default_context()
@ -107,9 +94,7 @@ def main(arguments):
ssl_ctx.load_verify_locations(environ["REQUESTS_CA_BUNDLE"])
if not zabbix_token:
zabbix = ZabbixAPI(
zabbix_host, user=zabbix_user, password=zabbix_pass, ssl_context=ssl_ctx
)
zabbix = ZabbixAPI(zabbix_host, user=zabbix_user, password=zabbix_pass, ssl_context=ssl_ctx)
else:
zabbix = ZabbixAPI(zabbix_host, token=zabbix_token, ssl_context=ssl_ctx)
zabbix.check_auth()
@ -126,9 +111,7 @@ def main(arguments):
netbox_devices = list(netbox.dcim.devices.filter(**config["nb_device_filter"]))
netbox_vms = []
if config["sync_vms"]:
netbox_vms = list(
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])
)
netbox_vms = list(netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]))
netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all()))
netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries
@ -162,9 +145,7 @@ def main(arguments):
# Check if a valid template has been found for this VM.
if not vm.zbx_template_names:
continue
vm.set_hostgroup(
config["vm_hostgroup_format"], netbox_site_groups, netbox_regions
)
vm.set_hostgroup(config["vm_hostgroup_format"], netbox_site_groups, netbox_regions)
# Check if a valid hostgroup has been found for this VM.
if not vm.hostgroups:
continue
@ -181,9 +162,7 @@ def main(arguments):
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"VM {vm.name}: skipping since this VM is not in the active state."
)
logger.info(f"VM {vm.name}: skipping since this VM is not in the active state.")
continue
# Check if the VM is in the disabled state
if vm.status in config["zabbix_device_disable"]:
@ -191,14 +170,14 @@ def main(arguments):
# Add hostgroup if config is set
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = vm.createZabbixHostgroup(zabbix_groups)
hostgroups = vm.create_zabbix_hostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
zabbix_groups.append(group)
# Check if VM is already in Zabbix
if vm.zabbix_id:
vm.ConsistencyCheck(
vm.consistency_check(
zabbix_groups,
zabbix_templates,
zabbix_proxy_list,
@ -207,7 +186,7 @@ def main(arguments):
)
continue
# Add VM to Zabbix
vm.createInZabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
vm.create_in_zabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
except SyncError:
pass
@ -230,9 +209,7 @@ def main(arguments):
# Check if a valid template has been found for this VM.
if not device.zbx_template_names:
continue
device.set_hostgroup(
config["hostgroup_format"], netbox_site_groups, netbox_regions
)
device.set_hostgroup(config["hostgroup_format"], netbox_site_groups, netbox_regions)
# Check if a valid hostgroup has been found for this VM.
if not device.hostgroups:
continue
@ -241,12 +218,10 @@ def main(arguments):
device.set_tags()
# Checks if device is part of cluster.
# Requires clustering variable
if device.isCluster() and config["clustering"]:
if device.is_cluster() and config["clustering"]:
# Check if device is primary or secondary
if device.promoteMasterDevice():
logger.info(
"Device %s: is part of cluster and primary.", device.name
)
if device.promote_master_device():
logger.info("Device %s: is part of cluster and primary.", device.name)
else:
# Device is secondary in cluster.
# Don't continue with this device.
@ -265,10 +240,7 @@ def main(arguments):
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"Device {device.name}: skipping since this device is "
f"not in the active state."
)
logger.info(f"Device {device.name}: skipping since this device is not in the active state.")
continue
# Check if the device is in the disabled state
if device.status in config["zabbix_device_disable"]:
@ -276,14 +248,14 @@ def main(arguments):
# Add hostgroup is config is set
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = device.createZabbixHostgroup(zabbix_groups)
hostgroups = device.create_zabbix_hostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
zabbix_groups.append(group)
# Check if device is already in Zabbix
if device.zabbix_id:
device.ConsistencyCheck(
device.consistency_check(
zabbix_groups,
zabbix_templates,
zabbix_proxy_list,
@ -292,22 +264,16 @@ def main(arguments):
)
continue
# Add device to Zabbix
device.createInZabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
device.create_in_zabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
except SyncError:
pass
zabbix.logout()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="A script to sync Zabbix with NetBox device data."
)
parser.add_argument(
"-v", "--verbose", help="Turn on debugging.", action="store_true"
)
parser.add_argument(
"-vv", "--debug", help="Turn on debugging.", action="store_true"
)
parser = argparse.ArgumentParser(description="A script to sync Zabbix with NetBox device data.")
parser.add_argument("-v", "--verbose", help="Turn on debugging.", action="store_true")
parser.add_argument("-vv", "--debug", help="Turn on debugging.", action="store_true")
parser.add_argument(
"-vvv",
"--debug-all",

View File

@ -13,9 +13,10 @@ from modules.config import (
def test_load_config_defaults():
"""Test that load_config returns default values when no config file or env vars are present"""
with patch(
"modules.config.load_config_file", return_value=DEFAULT_CONFIG.copy()
), patch("modules.config.load_env_variable", return_value=None):
with (
patch("modules.config.load_config_file", return_value=DEFAULT_CONFIG.copy()),
patch("modules.config.load_env_variable", return_value=None),
):
config = load_config()
assert config == DEFAULT_CONFIG
assert config["templates_config_context"] is False
@ -28,8 +29,9 @@ def test_load_config_file():
mock_config["templates_config_context"] = True
mock_config["sync_vms"] = True
with patch("modules.config.load_config_file", return_value=mock_config), patch(
"modules.config.load_env_variable", return_value=None
with (
patch("modules.config.load_config_file", return_value=mock_config),
patch("modules.config.load_env_variable", return_value=None),
):
config = load_config()
assert config["templates_config_context"] is True
@ -49,9 +51,10 @@ def test_load_env_variables():
return True
return None
with patch(
"modules.config.load_config_file", return_value=DEFAULT_CONFIG.copy()
), patch("modules.config.load_env_variable", side_effect=mock_load_env):
with (
patch("modules.config.load_config_file", return_value=DEFAULT_CONFIG.copy()),
patch("modules.config.load_env_variable", side_effect=mock_load_env),
):
config = load_config()
assert config["sync_vms"] is True
assert config["create_journal"] is True
@ -71,8 +74,9 @@ def test_env_vars_override_config_file():
return True
return None
with patch("modules.config.load_config_file", return_value=mock_config), patch(
"modules.config.load_env_variable", side_effect=mock_load_env
with (
patch("modules.config.load_config_file", return_value=mock_config),
patch("modules.config.load_env_variable", side_effect=mock_load_env),
):
config = load_config()
# This should be overridden by the env var
@ -84,9 +88,10 @@ def test_env_vars_override_config_file():
def test_load_config_file_function():
"""Test the load_config_file function directly"""
# Test when the file exists
with patch("pathlib.Path.exists", return_value=True), patch(
"importlib.util.spec_from_file_location"
) as mock_spec:
with (
patch("pathlib.Path.exists", return_value=True),
patch("importlib.util.spec_from_file_location") as mock_spec,
):
# Setup the mock module with attributes
mock_module = MagicMock()
mock_module.templates_config_context = True
@ -140,8 +145,12 @@ def test_load_config_file_exception_handling():
"""Test that load_config_file handles exceptions gracefully"""
# This test requires modifying the load_config_file function to handle exceptions
# For now, we're just checking that an exception is raised
with patch("pathlib.Path.exists", return_value=True), patch(
"importlib.util.spec_from_file_location", side_effect=Exception("Import error")
with (
patch("pathlib.Path.exists", return_value=True),
patch(
"importlib.util.spec_from_file_location",
side_effect=Exception("Import error"),
),
):
# Since the current implementation doesn't handle exceptions, we should
# expect an exception to be raised

View File

@ -68,7 +68,7 @@ class TestDeviceDeletion(unittest.TestCase):
self.mock_nb_device.save.assert_called_once()
self.assertIsNone(self.mock_nb_device.custom_fields["zabbix_hostid"])
self.mock_logger.info.assert_called_with(
f"Host {self.device.name}: " "Deleted host from Zabbix."
f"Host {self.device.name}: Deleted host from Zabbix."
)
def test_cleanup_device_already_deleted(self):

View File

@ -175,9 +175,7 @@ class TestHostgroups(unittest.TestCase):
# Custom format: site/tenant/platform/location
complex_result = hostgroup.generate("site/tenant/platform/location")
self.assertEqual(
complex_result, "TestSite/TestTenant/TestPlatform/TestLocation"
)
self.assertEqual(complex_result, "TestSite/TestTenant/TestPlatform/TestLocation")
def test_vm_hostgroup_formats(self):
"""Test different hostgroup formats for VMs."""
@ -325,13 +323,13 @@ class TestHostgroups(unittest.TestCase):
# Should include the parent site group
self.assertEqual(result, "TestSite/ParentSiteGroup/TestSiteGroup/TestRole")
def test_list_formatoptions(self):
"""Test the list_formatoptions method for debugging."""
def test_list_format_options(self):
"""Test the list_format_options method for debugging."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Patch sys.stdout to capture print output
with patch("sys.stdout") as mock_stdout:
hostgroup.list_formatoptions()
hostgroup.list_format_options()
# Check that print was called with expected output
calls = [

View File

@ -85,14 +85,15 @@ class TestPhysicalDevice(unittest.TestCase):
)
def test_set_basics_with_special_characters(self):
"""Test _setBasics when device name contains special characters."""
"""Test _set_basics when device name contains special characters."""
# Set name with special characters that
# will actually trigger the special character detection
self.mock_nb_device.name = "test-devïce"
# We need to patch the search function to simulate finding special characters
with patch("modules.device.search") as mock_search, patch(
"modules.device.config", {"device_cf": "zabbix_hostid"}
with (
patch("modules.device.search") as mock_search,
patch("modules.device.config", {"device_cf": "zabbix_hostid"}),
):
# Make the search function return True to simulate special characters
mock_search.return_value = True
@ -115,9 +116,7 @@ class TestPhysicalDevice(unittest.TestCase):
def test_get_templates_context(self):
"""Test get_templates_context with valid config."""
# Set up config_context with valid template data
self.mock_nb_device.config_context = {
"zabbix": {"templates": ["Template1", "Template2"]}
}
self.mock_nb_device.config_context = {"zabbix": {"templates": ["Template1", "Template2"]}}
# Create device with the updated mock
with patch("modules.device.config", {"device_cf": "zabbix_hostid"}):
@ -196,9 +195,7 @@ class TestPhysicalDevice(unittest.TestCase):
self.mock_nb_device.config_context = {"zabbix": {"templates": ["Template1"]}}
# Mock get_templates_context to return expected templates
with patch.object(
PhysicalDevice, "get_templates_context", return_value=["Template1"]
):
with patch.object(PhysicalDevice, "get_templates_context", return_value=["Template1"]):
with patch("modules.device.config", {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
@ -209,9 +206,7 @@ class TestPhysicalDevice(unittest.TestCase):
)
# Call set_template with prefer_config_context=True
result = device.set_template(
prefer_config_context=True, overrule_custom=False
)
result = device.set_template(prefer_config_context=True, overrule_custom=False)
# Check result and template names
self.assertTrue(result)
@ -325,12 +320,10 @@ class TestPhysicalDevice(unittest.TestCase):
# Check result
self.assertTrue(result)
self.assertEqual(device.inventory_mode, 0) # Manual mode
self.assertEqual(
device.inventory, {"name": "test-device", "serialno_a": "ABC123"}
)
self.assertEqual(device.inventory, {"name": "test-device", "serialno_a": "ABC123"})
def test_iscluster_true(self):
"""Test isCluster when device is part of a cluster."""
def test_is_cluster_true(self):
"""Test is_cluster when device is part of a cluster."""
# Set up virtual_chassis
self.mock_nb_device.virtual_chassis = MagicMock()
@ -344,11 +337,11 @@ class TestPhysicalDevice(unittest.TestCase):
logger=self.mock_logger,
)
# Check isCluster result
self.assertTrue(device.isCluster())
# Check is_cluster result
self.assertTrue(device.is_cluster())
def test_is_cluster_false(self):
"""Test isCluster when device is not part of a cluster."""
"""Test is_cluster when device is not part of a cluster."""
# Set virtual_chassis to None
self.mock_nb_device.virtual_chassis = None
@ -362,18 +355,16 @@ class TestPhysicalDevice(unittest.TestCase):
logger=self.mock_logger,
)
# Check isCluster result
self.assertFalse(device.isCluster())
# Check is_cluster result
self.assertFalse(device.is_cluster())
def test_promote_master_device_primary(self):
"""Test promoteMasterDevice when device is primary in cluster."""
"""Test promote_master_device when device is primary in cluster."""
# Set up virtual chassis with master device
mock_vc = MagicMock()
mock_vc.name = "virtual-chassis-1"
mock_master = MagicMock()
mock_master.id = (
self.mock_nb_device.id
) # Set master ID to match the current device
mock_master.id = self.mock_nb_device.id # Set master ID to match the current device
mock_vc.master = mock_master
self.mock_nb_device.virtual_chassis = mock_vc
@ -386,8 +377,8 @@ class TestPhysicalDevice(unittest.TestCase):
logger=self.mock_logger,
)
# Call promoteMasterDevice and check the result
result = device.promoteMasterDevice()
# Call promote_master_device and check the result
result = device.promote_master_device()
# Should return True for primary device
self.assertTrue(result)
@ -395,14 +386,12 @@ class TestPhysicalDevice(unittest.TestCase):
self.assertEqual(device.name, "virtual-chassis-1")
def test_promote_master_device_secondary(self):
"""Test promoteMasterDevice when device is secondary in cluster."""
"""Test promote_master_device when device is secondary in cluster."""
# Set up virtual chassis with a different master device
mock_vc = MagicMock()
mock_vc.name = "virtual-chassis-1"
mock_master = MagicMock()
mock_master.id = (
self.mock_nb_device.id + 1
) # Different ID than the current device
mock_master.id = self.mock_nb_device.id + 1 # Different ID than the current device
mock_vc.master = mock_master
self.mock_nb_device.virtual_chassis = mock_vc
@ -415,8 +404,8 @@ class TestPhysicalDevice(unittest.TestCase):
logger=self.mock_logger,
)
# Call promoteMasterDevice and check the result
result = device.promoteMasterDevice()
# Call promote_master_device and check the result
result = device.promote_master_device()
# Should return False for secondary device
self.assertFalse(result)