Fixed linting on several files

This commit is contained in:
TheNetworkGuy
2026-02-11 15:51:35 +00:00
parent 3227bb3165
commit e5d4bb64f0
9 changed files with 52 additions and 46 deletions
+2
View File
@@ -123,6 +123,8 @@ def load_config_file(config_default, config_file="config.py"):
dconf = config_default.copy()
# Dynamically import the config module
spec = util.spec_from_file_location("config", config_path)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot load config from {config_path}")
config_module = util.module_from_spec(spec)
spec.loader.exec_module(config_module)
# Update DEFAULT_CONFIG with variables from the config module
+1 -1
View File
@@ -541,7 +541,7 @@ class PhysicalDevice:
# Set group and template ID's for host
if not self.set_zbx_groupid(groups):
e = (
f"Unable to find group '{self.hostgroup}' "
f"Unable to find group '{self.hostgroups}' "
f"for host {self.name} in Zabbix."
)
self.logger.warning(e)
+1 -1
View File
@@ -26,7 +26,7 @@ class Hostgroup:
self.logger = logger if logger else getLogger(__name__)
if obj_type not in ("vm", "dev"):
msg = f"Unable to create hostgroup with type {type}"
self.logger.error()
self.logger.error(msg)
raise HostgroupError(msg)
self.type = str(obj_type)
self.nb = nb_obj
+9 -8
View File
@@ -46,31 +46,32 @@ class ZabbixInterface:
# Checks if SNMP settings are defined in NetBox
if "snmp" in self.context["zabbix"]:
snmp = self.context["zabbix"]["snmp"]
self.interface["details"] = {}
details: dict[str, str] = {}
self.interface["details"] = details
# Checks if bulk config has been defined
if "bulk" in snmp:
self.interface["details"]["bulk"] = str(snmp.pop("bulk"))
details["bulk"] = str(snmp.pop("bulk"))
else:
# Fallback to bulk enabled if not specified
self.interface["details"]["bulk"] = "1"
details["bulk"] = "1"
# SNMP Version config is required in NetBox config context
if snmp.get("version"):
self.interface["details"]["version"] = str(snmp.pop("version"))
details["version"] = str(snmp.pop("version"))
else:
e = "SNMP version option is not defined."
raise InterfaceConfigError(e)
# If version 1 or 2 is used, get community string
if self.interface["details"]["version"] in ["1", "2"]:
if details["version"] in ["1", "2"]:
if "community" in snmp:
# Set SNMP community to confix context value
community = snmp["community"]
else:
# Set SNMP community to default
community = "{$SNMP_COMMUNITY}"
self.interface["details"]["community"] = str(community)
details["community"] = str(community)
# If version 3 has been used, get all
# SNMPv3 NetBox related configs
elif self.interface["details"]["version"] == "3":
elif details["version"] == "3":
items = [
"securityname",
"securitylevel",
@@ -82,7 +83,7 @@ class ZabbixInterface:
]
for key, item in snmp.items():
if key in items:
self.interface["details"][key] = str(item)
details[key] = str(item)
else:
e = "Unsupported SNMP version."
raise InterfaceConfigError(e)
+5 -1
View File
@@ -128,7 +128,11 @@ class ZabbixTags:
# Pull in NetBox device tags if tag_name is set
if self.tag_name and isinstance(self.tag_name, str):
for tag in self.nb.tags:
if self.tag_value.lower() in ["display", "name", "slug"]:
if (
self.tag_value
and isinstance(self.tag_value, str)
and self.tag_value.lower() in ["display", "name", "slug"]
):
value = tag[self.tag_value]
else:
value = tag["name"]
+15 -9
View File
@@ -1,7 +1,7 @@
"""A collection of tools used by several classes"""
from collections.abc import Callable
from typing import Any, overload
from typing import Any, cast, overload
from modules.exceptions import HostgroupError
@@ -23,10 +23,14 @@ def build_path(endpoint, list_of_dicts):
item_path = []
itemlist = [i for i in list_of_dicts if i["name"] == endpoint]
item = itemlist[0] if len(itemlist) == 1 else None
if item is None:
return []
item_path.append(item["name"])
while item["_depth"] > 0:
itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])]
item = itemlist[0] if len(itemlist) == 1 else None
if item is None:
break
item_path.append(item["name"])
item_path.reverse()
return item_path
@@ -60,9 +64,10 @@ def cf_to_string(cf, key="name", logger=None):
if isinstance(cf, dict):
if key in cf:
return cf[key]
logger.error(
"Conversion of custom field failed, '%s' not found in cf dict.", key
)
if logger:
logger.error(
"Conversion of custom field failed, '%s' not found in cf dict.", key
)
return None
return cf
@@ -145,7 +150,7 @@ def remove_duplicates(
output_list.sort(key=lambda x: x[sortkey])
elif sortkey and callable(sortkey):
output_list.sort(key=sortkey)
output_list.sort(key=cast(Any, sortkey))
return output_list
@@ -190,9 +195,9 @@ def verify_hg_format(
"cfs": {"dev": [], "vm": []},
}
for cf in device_cfs:
allowed_objects["cfs"]["dev"].append(cf.name)
allowed_objects["cfs"]["dev"].append(cf.name) # type: ignore[index]
for cf in vm_cfs:
allowed_objects["cfs"]["vm"].append(cf.name)
allowed_objects["cfs"]["vm"].append(cf.name) # type: ignore[index]
hg_objects = []
if isinstance(hg_format, list):
for f in hg_format:
@@ -203,14 +208,15 @@ def verify_hg_format(
for hg_object in hg_objects:
if (
hg_object not in allowed_objects[hg_type]
and hg_object not in allowed_objects["cfs"][hg_type]
and hg_object not in allowed_objects["cfs"][hg_type] # type: ignore[index]
and not hg_object.startswith(('"', "'"))
):
e = (
f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and separate them with '/'."
)
logger.warning(e)
if logger:
logger.warning(e)
raise HostgroupError(e)
+12 -11
View File
@@ -6,7 +6,8 @@
import argparse
import logging
import ssl
from os import environ, sys
import sys
from os import environ
from pynetbox import api
from pynetbox.core.query import RequestError as NBRequestError
@@ -131,13 +132,13 @@ def main(arguments):
netbox_site_groups = convert_recordset(netbox.dcim.site_groups.all())
netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries
zabbix_groups = zabbix.hostgroup.get(output=["groupid", "name"])
zabbix_templates = zabbix.template.get(output=["templateid", "name"])
zabbix_proxies = zabbix.proxy.get(output=["proxyid", proxy_name])
zabbix_groups = zabbix.hostgroup.get(output=["groupid", "name"]) # type: ignore[attr-defined]
zabbix_templates = zabbix.template.get(output=["templateid", "name"]) # type: ignore[attr-defined]
zabbix_proxies = zabbix.proxy.get(output=["proxyid", proxy_name]) # type: ignore[attr-defined]
# Set empty list for proxy processing Zabbix <= 6
zabbix_proxygroups = []
if str(zabbix.version).startswith("7"):
zabbix_proxygroups = zabbix.proxygroup.get(output=["proxy_groupid", "name"])
zabbix_proxygroups = zabbix.proxygroup.get(output=["proxy_groupid", "name"]) # type: ignore[attr-defined]
# Sanitize proxy data
if proxy_name == "host":
for proxy in zabbix_proxies:
@@ -169,7 +170,7 @@ def main(arguments):
continue
if config["extended_site_properties"] and nb_vm.site:
logger.debug("VM %s: extending site information.", vm.name)
vm.site = convert_recordset(netbox.dcim.sites.filter(id=nb_vm.site.id))
vm.site = convert_recordset(netbox.dcim.sites.filter(id=nb_vm.site.id)) # type: ignore[attr-defined]
vm.set_inventory(nb_vm)
vm.set_usermacros()
vm.set_tags()
@@ -193,14 +194,14 @@ def main(arguments):
# Add hostgroup if config is set
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = vm.createZabbixHostgroup(zabbix_groups)
hostgroups = vm.create_zbx_hostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
zabbix_groups.append(group)
# Check if VM is already in Zabbix
if vm.zabbix_id:
vm.ConsistencyCheck(
vm.consistency_check(
zabbix_groups,
zabbix_templates,
zabbix_proxy_list,
@@ -244,7 +245,7 @@ def main(arguments):
continue
if config["extended_site_properties"] and nb_device.site:
logger.debug("Device %s: extending site information.", device.name)
device.site = convert_recordset(
device.site = convert_recordset( # type: ignore[attr-defined]
netbox.dcim.sites.filter(id=nb_device.site.id)
)
device.set_inventory(nb_device)
@@ -252,7 +253,7 @@ def main(arguments):
device.set_tags()
# Checks if device is part of cluster.
# Requires clustering variable
if device.isCluster() and config["clustering"]:
if device.is_cluster() and config["clustering"]:
# Check if device is primary or secondary
if device.promote_primary_device():
logger.info(
@@ -287,7 +288,7 @@ def main(arguments):
# Add hostgroup is config is set
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = device.createZabbixHostgroup(zabbix_groups)
hostgroups = device.create_zbx_hostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
+4 -12
View File
@@ -142,13 +142,9 @@ class TestZabbixInterface(unittest.TestCase):
self.assertEqual(details["securityname"], "snmpuser")
self.assertEqual(details["securitylevel"], "authPriv")
self.assertEqual(details["authprotocol"], "SHA")
self.assertEqual(
details["authpassphrase"], "authpass123"
)
self.assertEqual(details["authpassphrase"], "authpass123")
self.assertEqual(details["privprotocol"], "AES")
self.assertEqual(
details["privpassphrase"], "privpass123"
)
self.assertEqual(details["privpassphrase"], "privpass123")
self.assertEqual(details["contextname"], "context1")
def test_set_snmp_no_snmp_config(self):
@@ -217,9 +213,7 @@ class TestZabbixInterface(unittest.TestCase):
self.assertEqual(interface.interface["port"], "161")
details = cast(dict[str, str], interface.interface["details"])
self.assertEqual(details["version"], "2")
self.assertEqual(
details["community"], "{$SNMP_COMMUNITY}"
)
self.assertEqual(details["community"], "{$SNMP_COMMUNITY}")
self.assertEqual(details["bulk"], "1")
def test_set_default_agent(self):
@@ -243,6 +237,4 @@ class TestZabbixInterface(unittest.TestCase):
# Should use default community string
details = cast(dict[str, str], interface.interface["details"])
self.assertEqual(
details["community"], "{$SNMP_COMMUNITY}"
)
self.assertEqual(details["community"], "{$SNMP_COMMUNITY}")
+3 -3
View File
@@ -54,7 +54,7 @@ class TestUsermacroSync(unittest.TestCase):
"modules.device.config",
{"usermacro_sync": False, "device_cf": "zabbix_hostid", "tag_sync": False},
)
@patch.object(PhysicalDevice, '_usermacro_map')
@patch.object(PhysicalDevice, "_usermacro_map")
def test_usermacro_sync_false(self, mock_usermacro_map):
mock_usermacro_map.return_value = self.usermacro_map
device = self.create_mock_device()
@@ -70,7 +70,7 @@ class TestUsermacroSync(unittest.TestCase):
{"usermacro_sync": True, "device_cf": "zabbix_hostid", "tag_sync": False},
)
@patch("modules.device.ZabbixUsermacros")
@patch.object(PhysicalDevice, '_usermacro_map')
@patch.object(PhysicalDevice, "_usermacro_map")
def test_usermacro_sync_true(self, mock_usermacro_map, mock_usermacros_class):
mock_usermacro_map.return_value = self.usermacro_map
# Mock the ZabbixUsermacros class to return some test data
@@ -94,7 +94,7 @@ class TestUsermacroSync(unittest.TestCase):
{"usermacro_sync": "full", "device_cf": "zabbix_hostid", "tag_sync": False},
)
@patch("modules.device.ZabbixUsermacros")
@patch.object(PhysicalDevice, '_usermacro_map')
@patch.object(PhysicalDevice, "_usermacro_map")
def test_usermacro_sync_full(self, mock_usermacro_map, mock_usermacros_class):
mock_usermacro_map.return_value = self.usermacro_map
# Mock the ZabbixUsermacros class to return some test data