From e5d4bb64f0f5e8ffc2056541b8528bbb0720d9dd Mon Sep 17 00:00:00 2001 From: TheNetworkGuy Date: Wed, 11 Feb 2026 15:51:35 +0000 Subject: [PATCH] Fixed linting on several files --- modules/config.py | 2 ++ modules/device.py | 2 +- modules/hostgroups.py | 2 +- modules/interface.py | 17 +++++++++-------- modules/tags.py | 6 +++++- modules/tools.py | 24 +++++++++++++++--------- netbox_zabbix_sync.py | 23 ++++++++++++----------- tests/test_interface.py | 16 ++++------------ tests/test_usermacros.py | 6 +++--- 9 files changed, 52 insertions(+), 46 deletions(-) diff --git a/modules/config.py b/modules/config.py index 8ea2673..e5509c6 100644 --- a/modules/config.py +++ b/modules/config.py @@ -123,6 +123,8 @@ def load_config_file(config_default, config_file="config.py"): dconf = config_default.copy() # Dynamically import the config module spec = util.spec_from_file_location("config", config_path) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load config from {config_path}") config_module = util.module_from_spec(spec) spec.loader.exec_module(config_module) # Update DEFAULT_CONFIG with variables from the config module diff --git a/modules/device.py b/modules/device.py index 76c754c..71cb6dd 100644 --- a/modules/device.py +++ b/modules/device.py @@ -541,7 +541,7 @@ class PhysicalDevice: # Set group and template ID's for host if not self.set_zbx_groupid(groups): e = ( - f"Unable to find group '{self.hostgroup}' " + f"Unable to find group '{self.hostgroups}' " f"for host {self.name} in Zabbix." ) self.logger.warning(e) diff --git a/modules/hostgroups.py b/modules/hostgroups.py index 29eb959..d5544be 100644 --- a/modules/hostgroups.py +++ b/modules/hostgroups.py @@ -26,7 +26,7 @@ class Hostgroup: self.logger = logger if logger else getLogger(__name__) if obj_type not in ("vm", "dev"): msg = f"Unable to create hostgroup with type {type}" - self.logger.error() + self.logger.error(msg) raise HostgroupError(msg) self.type = str(obj_type) self.nb = nb_obj diff --git a/modules/interface.py b/modules/interface.py index b86038f..4b79134 100644 --- a/modules/interface.py +++ b/modules/interface.py @@ -46,31 +46,32 @@ class ZabbixInterface: # Checks if SNMP settings are defined in NetBox if "snmp" in self.context["zabbix"]: snmp = self.context["zabbix"]["snmp"] - self.interface["details"] = {} + details: dict[str, str] = {} + self.interface["details"] = details # Checks if bulk config has been defined if "bulk" in snmp: - self.interface["details"]["bulk"] = str(snmp.pop("bulk")) + details["bulk"] = str(snmp.pop("bulk")) else: # Fallback to bulk enabled if not specified - self.interface["details"]["bulk"] = "1" + details["bulk"] = "1" # SNMP Version config is required in NetBox config context if snmp.get("version"): - self.interface["details"]["version"] = str(snmp.pop("version")) + details["version"] = str(snmp.pop("version")) else: e = "SNMP version option is not defined." raise InterfaceConfigError(e) # If version 1 or 2 is used, get community string - if self.interface["details"]["version"] in ["1", "2"]: + if details["version"] in ["1", "2"]: if "community" in snmp: # Set SNMP community to confix context value community = snmp["community"] else: # Set SNMP community to default community = "{$SNMP_COMMUNITY}" - self.interface["details"]["community"] = str(community) + details["community"] = str(community) # If version 3 has been used, get all # SNMPv3 NetBox related configs - elif self.interface["details"]["version"] == "3": + elif details["version"] == "3": items = [ "securityname", "securitylevel", @@ -82,7 +83,7 @@ class ZabbixInterface: ] for key, item in snmp.items(): if key in items: - self.interface["details"][key] = str(item) + details[key] = str(item) else: e = "Unsupported SNMP version." raise InterfaceConfigError(e) diff --git a/modules/tags.py b/modules/tags.py index 11514ab..21497f6 100644 --- a/modules/tags.py +++ b/modules/tags.py @@ -128,7 +128,11 @@ class ZabbixTags: # Pull in NetBox device tags if tag_name is set if self.tag_name and isinstance(self.tag_name, str): for tag in self.nb.tags: - if self.tag_value.lower() in ["display", "name", "slug"]: + if ( + self.tag_value + and isinstance(self.tag_value, str) + and self.tag_value.lower() in ["display", "name", "slug"] + ): value = tag[self.tag_value] else: value = tag["name"] diff --git a/modules/tools.py b/modules/tools.py index be0d8a4..a6b0a22 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -1,7 +1,7 @@ """A collection of tools used by several classes""" from collections.abc import Callable -from typing import Any, overload +from typing import Any, cast, overload from modules.exceptions import HostgroupError @@ -23,10 +23,14 @@ def build_path(endpoint, list_of_dicts): item_path = [] itemlist = [i for i in list_of_dicts if i["name"] == endpoint] item = itemlist[0] if len(itemlist) == 1 else None + if item is None: + return [] item_path.append(item["name"]) while item["_depth"] > 0: itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])] item = itemlist[0] if len(itemlist) == 1 else None + if item is None: + break item_path.append(item["name"]) item_path.reverse() return item_path @@ -60,9 +64,10 @@ def cf_to_string(cf, key="name", logger=None): if isinstance(cf, dict): if key in cf: return cf[key] - logger.error( - "Conversion of custom field failed, '%s' not found in cf dict.", key - ) + if logger: + logger.error( + "Conversion of custom field failed, '%s' not found in cf dict.", key + ) return None return cf @@ -145,7 +150,7 @@ def remove_duplicates( output_list.sort(key=lambda x: x[sortkey]) elif sortkey and callable(sortkey): - output_list.sort(key=sortkey) + output_list.sort(key=cast(Any, sortkey)) return output_list @@ -190,9 +195,9 @@ def verify_hg_format( "cfs": {"dev": [], "vm": []}, } for cf in device_cfs: - allowed_objects["cfs"]["dev"].append(cf.name) + allowed_objects["cfs"]["dev"].append(cf.name) # type: ignore[index] for cf in vm_cfs: - allowed_objects["cfs"]["vm"].append(cf.name) + allowed_objects["cfs"]["vm"].append(cf.name) # type: ignore[index] hg_objects = [] if isinstance(hg_format, list): for f in hg_format: @@ -203,14 +208,15 @@ def verify_hg_format( for hg_object in hg_objects: if ( hg_object not in allowed_objects[hg_type] - and hg_object not in allowed_objects["cfs"][hg_type] + and hg_object not in allowed_objects["cfs"][hg_type] # type: ignore[index] and not hg_object.startswith(('"', "'")) ): e = ( f"Hostgroup item {hg_object} is not valid. Make sure you" " use valid items and separate them with '/'." ) - logger.warning(e) + if logger: + logger.warning(e) raise HostgroupError(e) diff --git a/netbox_zabbix_sync.py b/netbox_zabbix_sync.py index 8996b38..db4edc9 100755 --- a/netbox_zabbix_sync.py +++ b/netbox_zabbix_sync.py @@ -6,7 +6,8 @@ import argparse import logging import ssl -from os import environ, sys +import sys +from os import environ from pynetbox import api from pynetbox.core.query import RequestError as NBRequestError @@ -131,13 +132,13 @@ def main(arguments): netbox_site_groups = convert_recordset(netbox.dcim.site_groups.all()) netbox_regions = convert_recordset(netbox.dcim.regions.all()) netbox_journals = netbox.extras.journal_entries - zabbix_groups = zabbix.hostgroup.get(output=["groupid", "name"]) - zabbix_templates = zabbix.template.get(output=["templateid", "name"]) - zabbix_proxies = zabbix.proxy.get(output=["proxyid", proxy_name]) + zabbix_groups = zabbix.hostgroup.get(output=["groupid", "name"]) # type: ignore[attr-defined] + zabbix_templates = zabbix.template.get(output=["templateid", "name"]) # type: ignore[attr-defined] + zabbix_proxies = zabbix.proxy.get(output=["proxyid", proxy_name]) # type: ignore[attr-defined] # Set empty list for proxy processing Zabbix <= 6 zabbix_proxygroups = [] if str(zabbix.version).startswith("7"): - zabbix_proxygroups = zabbix.proxygroup.get(output=["proxy_groupid", "name"]) + zabbix_proxygroups = zabbix.proxygroup.get(output=["proxy_groupid", "name"]) # type: ignore[attr-defined] # Sanitize proxy data if proxy_name == "host": for proxy in zabbix_proxies: @@ -169,7 +170,7 @@ def main(arguments): continue if config["extended_site_properties"] and nb_vm.site: logger.debug("VM %s: extending site information.", vm.name) - vm.site = convert_recordset(netbox.dcim.sites.filter(id=nb_vm.site.id)) + vm.site = convert_recordset(netbox.dcim.sites.filter(id=nb_vm.site.id)) # type: ignore[attr-defined] vm.set_inventory(nb_vm) vm.set_usermacros() vm.set_tags() @@ -193,14 +194,14 @@ def main(arguments): # Add hostgroup if config is set if config["create_hostgroups"]: # Create new hostgroup. Potentially multiple groups if nested - hostgroups = vm.createZabbixHostgroup(zabbix_groups) + hostgroups = vm.create_zbx_hostgroup(zabbix_groups) # go through all newly created hostgroups for group in hostgroups: # Add new hostgroups to zabbix group list zabbix_groups.append(group) # Check if VM is already in Zabbix if vm.zabbix_id: - vm.ConsistencyCheck( + vm.consistency_check( zabbix_groups, zabbix_templates, zabbix_proxy_list, @@ -244,7 +245,7 @@ def main(arguments): continue if config["extended_site_properties"] and nb_device.site: logger.debug("Device %s: extending site information.", device.name) - device.site = convert_recordset( + device.site = convert_recordset( # type: ignore[attr-defined] netbox.dcim.sites.filter(id=nb_device.site.id) ) device.set_inventory(nb_device) @@ -252,7 +253,7 @@ def main(arguments): device.set_tags() # Checks if device is part of cluster. # Requires clustering variable - if device.isCluster() and config["clustering"]: + if device.is_cluster() and config["clustering"]: # Check if device is primary or secondary if device.promote_primary_device(): logger.info( @@ -287,7 +288,7 @@ def main(arguments): # Add hostgroup is config is set if config["create_hostgroups"]: # Create new hostgroup. Potentially multiple groups if nested - hostgroups = device.createZabbixHostgroup(zabbix_groups) + hostgroups = device.create_zbx_hostgroup(zabbix_groups) # go through all newly created hostgroups for group in hostgroups: # Add new hostgroups to zabbix group list diff --git a/tests/test_interface.py b/tests/test_interface.py index 5d809fe..3c37413 100644 --- a/tests/test_interface.py +++ b/tests/test_interface.py @@ -142,13 +142,9 @@ class TestZabbixInterface(unittest.TestCase): self.assertEqual(details["securityname"], "snmpuser") self.assertEqual(details["securitylevel"], "authPriv") self.assertEqual(details["authprotocol"], "SHA") - self.assertEqual( - details["authpassphrase"], "authpass123" - ) + self.assertEqual(details["authpassphrase"], "authpass123") self.assertEqual(details["privprotocol"], "AES") - self.assertEqual( - details["privpassphrase"], "privpass123" - ) + self.assertEqual(details["privpassphrase"], "privpass123") self.assertEqual(details["contextname"], "context1") def test_set_snmp_no_snmp_config(self): @@ -217,9 +213,7 @@ class TestZabbixInterface(unittest.TestCase): self.assertEqual(interface.interface["port"], "161") details = cast(dict[str, str], interface.interface["details"]) self.assertEqual(details["version"], "2") - self.assertEqual( - details["community"], "{$SNMP_COMMUNITY}" - ) + self.assertEqual(details["community"], "{$SNMP_COMMUNITY}") self.assertEqual(details["bulk"], "1") def test_set_default_agent(self): @@ -243,6 +237,4 @@ class TestZabbixInterface(unittest.TestCase): # Should use default community string details = cast(dict[str, str], interface.interface["details"]) - self.assertEqual( - details["community"], "{$SNMP_COMMUNITY}" - ) + self.assertEqual(details["community"], "{$SNMP_COMMUNITY}") diff --git a/tests/test_usermacros.py b/tests/test_usermacros.py index aeb77df..dbcfaed 100644 --- a/tests/test_usermacros.py +++ b/tests/test_usermacros.py @@ -54,7 +54,7 @@ class TestUsermacroSync(unittest.TestCase): "modules.device.config", {"usermacro_sync": False, "device_cf": "zabbix_hostid", "tag_sync": False}, ) - @patch.object(PhysicalDevice, '_usermacro_map') + @patch.object(PhysicalDevice, "_usermacro_map") def test_usermacro_sync_false(self, mock_usermacro_map): mock_usermacro_map.return_value = self.usermacro_map device = self.create_mock_device() @@ -70,7 +70,7 @@ class TestUsermacroSync(unittest.TestCase): {"usermacro_sync": True, "device_cf": "zabbix_hostid", "tag_sync": False}, ) @patch("modules.device.ZabbixUsermacros") - @patch.object(PhysicalDevice, '_usermacro_map') + @patch.object(PhysicalDevice, "_usermacro_map") def test_usermacro_sync_true(self, mock_usermacro_map, mock_usermacros_class): mock_usermacro_map.return_value = self.usermacro_map # Mock the ZabbixUsermacros class to return some test data @@ -94,7 +94,7 @@ class TestUsermacroSync(unittest.TestCase): {"usermacro_sync": "full", "device_cf": "zabbix_hostid", "tag_sync": False}, ) @patch("modules.device.ZabbixUsermacros") - @patch.object(PhysicalDevice, '_usermacro_map') + @patch.object(PhysicalDevice, "_usermacro_map") def test_usermacro_sync_full(self, mock_usermacro_map, mock_usermacros_class): mock_usermacro_map.return_value = self.usermacro_map # Mock the ZabbixUsermacros class to return some test data