diff --git a/modules/device.py b/modules/device.py index 4ec96b5..83e6fdc 100644 --- a/modules/device.py +++ b/modules/device.py @@ -3,48 +3,61 @@ """ Device specific handeling for NetBox to Zabbix """ -from os import sys -from re import search from copy import deepcopy from logging import getLogger +from os import sys +from re import search + from zabbix_utils import APIRequestError -from modules.exceptions import (SyncInventoryError, TemplateError, SyncExternalError, - InterfaceConfigError, JournalError) -from modules.interface import ZabbixInterface -from modules.usermacros import ZabbixUsermacros -from modules.tags import ZabbixTags + +from modules.exceptions import ( + InterfaceConfigError, + JournalError, + SyncExternalError, + SyncInventoryError, + TemplateError, +) from modules.hostgroups import Hostgroup +from modules.interface import ZabbixInterface +from modules.tags import ZabbixTags from modules.tools import field_mapper, remove_duplicates +from modules.usermacros import ZabbixUsermacros try: from config import ( - template_cf, device_cf, - traverse_site_groups, - traverse_regions, - inventory_sync, - inventory_mode, + device_cf, device_inventory_map, - usermacro_sync, + device_tag_map, device_usermacro_map, - tag_sync, + inventory_mode, + inventory_sync, tag_lower, tag_name, + tag_sync, tag_value, - device_tag_map + template_cf, + traverse_regions, + traverse_site_groups, + usermacro_sync, ) except ModuleNotFoundError: - print("Configuration file config.py not found in main directory." - "Please create the file or rename the config.py.example file to config.py.") + print( + "Configuration file config.py not found in main directory." + "Please create the file or rename the config.py.example file to config.py." + ) sys.exit(0) -class PhysicalDevice(): + +class PhysicalDevice: # pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments """ Represents Network device. INPUT: (NetBox device class, ZabbixAPI class, journal flag, NB journal class) """ - def __init__(self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None): + def __init__( + self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None + ): self.nb = nb self.id = nb.id self.name = nb.name @@ -77,15 +90,15 @@ class PhysicalDevice(): return self.__repr__() def _inventory_map(self): - """ Use device inventory maps """ + """Use device inventory maps""" return device_inventory_map def _usermacro_map(self): - """ Use device inventory maps """ + """Use device inventory maps""" return device_usermacro_map def _tag_map(self): - """ Use device host tag maps """ + """Use device host tag maps""" return device_tag_map def _setBasics(self): @@ -112,30 +125,38 @@ class PhysicalDevice(): # Validate hostname format. odd_character_list = ["ä", "ö", "ü", "Ä", "Ö", "Ü", "ß"] self.use_visible_name = False - if (any(letter in self.name for letter in odd_character_list) or - bool(search('[\u0400-\u04FF]', self.name))): + if any(letter in self.name for letter in odd_character_list) or bool( + search("[\u0400-\u04ff]", self.name) + ): self.name = f"NETBOX_ID{self.id}" self.visible_name = self.nb.name self.use_visible_name = True - self.logger.info(f"Host {self.visible_name} contains special characters. " - f"Using {self.name} as name for the NetBox object " - f"and using {self.visible_name} as visible name in Zabbix.") + self.logger.info( + f"Host {self.visible_name} contains special characters. " + f"Using {self.name} as name for the NetBox object " + f"and using {self.visible_name} as visible name in Zabbix." + ) else: pass def set_hostgroup(self, hg_format, nb_site_groups, nb_regions): """Set the hostgroup for this device""" # Create new Hostgroup instance - hg = Hostgroup("dev", self.nb, self.nb_api_version, logger=self.logger, - nested_sitegroup_flag=traverse_site_groups, - nested_region_flag=traverse_regions, - nb_groups=nb_site_groups, - nb_regions=nb_regions) + hg = Hostgroup( + "dev", + self.nb, + self.nb_api_version, + logger=self.logger, + nested_sitegroup_flag=traverse_site_groups, + nested_region_flag=traverse_regions, + nb_groups=nb_site_groups, + nb_regions=nb_regions, + ) # Generate hostgroup based on hostgroup format self.hostgroup = hg.generate(hg_format) def set_template(self, prefer_config_context, overrule_custom): - """ Set Template """ + """Set Template""" self.zbx_template_names = None # Gather templates ONLY from the device specific context if prefer_config_context: @@ -159,7 +180,7 @@ class PhysicalDevice(): return True def get_templates_cf(self): - """ Get template from custom field """ + """Get template from custom field""" # Get Zabbix templates from the device type device_type_cfs = self.nb.device_type.custom_fields # Check if the ZBX Template CF is present @@ -167,20 +188,26 @@ class PhysicalDevice(): # Set value to template return [device_type_cfs[template_cf]] # Custom field not found, return error - e = (f"Custom field {template_cf} not " + e = ( + f"Custom field {template_cf} not " f"found for {self.nb.device_type.manufacturer.name}" - f" - {self.nb.device_type.display}.") + f" - {self.nb.device_type.display}." + ) raise TemplateError(e) def get_templates_context(self): - """ Get Zabbix templates from the device context """ + """Get Zabbix templates from the device context""" if "zabbix" not in self.config_context: - e = (f"Host {self.name}: Key 'zabbix' not found in config " - "context for template lookup") + e = ( + f"Host {self.name}: Key 'zabbix' not found in config " + "context for template lookup" + ) raise TemplateError(e) if "templates" not in self.config_context["zabbix"]: - e = (f"Host {self.name}: Key 'templates' not found in config " - "context 'zabbix' for template lookup") + e = ( + f"Host {self.name}: Key 'templates' not found in config " + "context 'zabbix' for template lookup" + ) raise TemplateError(e) # Check if format is list or string. if isinstance(self.config_context["zabbix"]["templates"], str): @@ -188,25 +215,31 @@ class PhysicalDevice(): return self.config_context["zabbix"]["templates"] def set_inventory(self, nbdevice): - """ Set host inventory """ + """Set host inventory""" # Set inventory mode. Default is disabled (see class init function). if inventory_mode == "disabled": if inventory_sync: - self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " - "Inventory sync is enabled in config but inventory mode is disabled.") + self.logger.error( + f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " + "Inventory sync is enabled in config but inventory mode is disabled." + ) return True if inventory_mode == "manual": self.inventory_mode = 0 elif inventory_mode == "automatic": self.inventory_mode = 1 else: - self.logger.error(f"Host {self.name}: Specified value for inventory mode in" - f" config is not valid. Got value {inventory_mode}") + self.logger.error( + f"Host {self.name}: Specified value for inventory mode in" + f" config is not valid. Got value {inventory_mode}" + ) return False self.inventory = {} - if inventory_sync and self.inventory_mode in [0,1]: + if inventory_sync and self.inventory_mode in [0, 1]: self.logger.debug(f"Host {self.name}: Starting inventory mapper") - self.inventory = field_mapper(self.name, self._inventory_map(), nbdevice, self.logger) + self.inventory = field_mapper( + self.name, self._inventory_map(), nbdevice, self.logger + ) return True def isCluster(self): @@ -220,13 +253,17 @@ class PhysicalDevice(): Returns chassis master ID. """ if not self.isCluster(): - e = (f"Unable to proces {self.name} for cluster calculation: " - f"not part of a cluster.") + e = ( + f"Unable to proces {self.name} for cluster calculation: " + f"not part of a cluster." + ) self.logger.warning(e) raise SyncInventoryError(e) if not self.nb.virtual_chassis.master: - e = (f"{self.name} is part of a NetBox virtual chassis which does " - "not have a master configured. Skipping for this reason.") + e = ( + f"{self.name} is part of a NetBox virtual chassis which does " + "not have a master configured. Skipping for this reason." + ) self.logger.error(e) raise SyncInventoryError(e) return self.nb.virtual_chassis.master.id @@ -239,9 +276,11 @@ class PhysicalDevice(): """ masterid = self.getClusterMaster() if masterid == self.id: - self.logger.debug(f"Host {self.name} is primary cluster member. " - f"Modifying hostname from {self.name} to " + - f"{self.nb.virtual_chassis.name}.") + self.logger.debug( + f"Host {self.name} is primary cluster member. " + f"Modifying hostname from {self.name} to " + + f"{self.nb.virtual_chassis.name}." + ) self.name = self.nb.virtual_chassis.name return True self.logger.debug(f"Host {self.name} is non-primary cluster member.") @@ -266,18 +305,24 @@ class PhysicalDevice(): # Go through all templates found in Zabbix for zbx_template in templates: # If the template names match - if zbx_template['name'] == nb_template: + if zbx_template["name"] == nb_template: # Set match variable to true, add template details # to class variable and return debug log template_match = True - self.zbx_templates.append({"templateid": zbx_template['templateid'], - "name": zbx_template['name']}) + self.zbx_templates.append( + { + "templateid": zbx_template["templateid"], + "name": zbx_template["name"], + } + ) e = f"Host {self.name}: found template {zbx_template['name']}" self.logger.debug(e) # Return error should the template not be found in Zabbix if not template_match: - e = (f"Unable to find template {nb_template} " - f"for host {self.name} in Zabbix. Skipping host...") + e = ( + f"Unable to find template {nb_template} " + f"for host {self.name} in Zabbix. Skipping host..." + ) self.logger.warning(e) raise SyncInventoryError(e) @@ -289,8 +334,8 @@ class PhysicalDevice(): """ # Go through all groups for group in groups: - if group['name'] == self.hostgroup: - self.group_id = group['groupid'] + if group["name"] == self.hostgroup: + self.group_id = group["groupid"] e = f"Host {self.name}: matched group {group['name']}" self.logger.debug(e) return True @@ -304,10 +349,13 @@ class PhysicalDevice(): if self.zabbix_id: try: # Check if the Zabbix host exists in Zabbix - zbx_host = bool(self.zabbix.host.get(filter={'hostid': self.zabbix_id}, - output=[])) - e = (f"Host {self.name}: was already deleted from Zabbix." - " Removed link in NetBox.") + zbx_host = bool( + self.zabbix.host.get(filter={"hostid": self.zabbix_id}, output=[]) + ) + e = ( + f"Host {self.name}: was already deleted from Zabbix." + " Removed link in NetBox." + ) if zbx_host: # Delete host should it exists self.zabbix.host.delete(self.zabbix_id) @@ -332,9 +380,9 @@ class PhysicalDevice(): """ # Validate the hostname or visible name field if not self.use_visible_name: - zbx_filter = {'host': self.name} + zbx_filter = {"host": self.name} else: - zbx_filter = {'name': self.visible_name} + zbx_filter = {"name": self.visible_name} host = self.zabbix.host.get(filter=zbx_filter, output=[]) return bool(host) @@ -364,24 +412,33 @@ class PhysicalDevice(): """ Generates Usermacros """ - macros = ZabbixUsermacros(self.nb, self._usermacro_map(), - usermacro_sync, logger=self.logger, - host=self.name) + macros = ZabbixUsermacros( + self.nb, + self._usermacro_map(), + usermacro_sync, + logger=self.logger, + host=self.name, + ) if macros.sync is False: self.usermacros = [] self.usermacros = macros.generate() return True - def set_tags(self): """ Generates Host Tags """ - tags = ZabbixTags(self.nb, self._tag_map(), - tag_sync, tag_lower, tag_name=tag_name, - tag_value=tag_value, logger=self.logger, - host=self.name) + tags = ZabbixTags( + self.nb, + self._tag_map(), + tag_sync, + tag_lower, + tag_name=tag_name, + tag_value=tag_value, + logger=self.logger, + host=self.name, + ) if tags.sync is False: self.tags = [] @@ -398,14 +455,16 @@ class PhysicalDevice(): # check if the key Zabbix is defined in the config context if not "zabbix" in self.nb.config_context: return False - if ("proxy" in self.nb.config_context["zabbix"] and - not self.nb.config_context["zabbix"]["proxy"]): + if ( + "proxy" in self.nb.config_context["zabbix"] + and not self.nb.config_context["zabbix"]["proxy"] + ): return False # Proxy group takes priority over a proxy due # to it being HA and therefore being more reliable # Includes proxy group fix since Zabbix <= 6 should ignore this proxy_types = ["proxy"] - if str(self.zabbix.version).startswith('7'): + if str(self.zabbix.version).startswith("7"): # Only insert groups in front of list for Zabbix7 proxy_types.insert(0, "proxy_group") for proxy_type in proxy_types: @@ -419,15 +478,23 @@ class PhysicalDevice(): continue # If the proxy name matches if proxy["name"] == proxy_name: - self.logger.debug(f"Host {self.name}: using {proxy['type']}" - f" {proxy_name}") + self.logger.debug( + f"Host {self.name}: using {proxy['type']}" f" {proxy_name}" + ) self.zbxproxy = proxy return True - self.logger.warning(f"Host {self.name}: unable to find proxy {proxy_name}") + self.logger.warning( + f"Host {self.name}: unable to find proxy {proxy_name}" + ) return False - def createInZabbix(self, groups, templates, proxies, - description="Host added by NetBox sync script."): + def createInZabbix( + self, + groups, + templates, + proxies, + description="Host added by NetBox sync script.", + ): """ Creates Zabbix host object with parameters from NetBox object. """ @@ -435,37 +502,40 @@ class PhysicalDevice(): if not self._zabbixHostnameExists(): # Set group and template ID's for host if not self.setZabbixGroupID(groups): - e = (f"Unable to find group '{self.hostgroup}' " - f"for host {self.name} in Zabbix.") + e = ( + f"Unable to find group '{self.hostgroup}' " + f"for host {self.name} in Zabbix." + ) self.logger.warning(e) raise SyncInventoryError(e) self.zbxTemplatePrepper(templates) templateids = [] for template in self.zbx_templates: - templateids.append({'templateid': template['templateid']}) + templateids.append({"templateid": template["templateid"]}) # Set interface, group and template configuration interfaces = self.setInterfaceDetails() groups = [{"groupid": self.group_id}] # Set Zabbix proxy if defined self.setProxy(proxies) # Set basic data for host creation - create_data = {"host": self.name, - "name": self.visible_name, - "status": self.zabbix_state, - "interfaces": interfaces, - "groups": groups, - "templates": templateids, - "description": description, - "inventory_mode": self.inventory_mode, - "inventory": self.inventory, - "macros": self.usermacros, - "tags": self.tags - } + create_data = { + "host": self.name, + "name": self.visible_name, + "status": self.zabbix_state, + "interfaces": interfaces, + "groups": groups, + "templates": templateids, + "description": description, + "inventory_mode": self.inventory_mode, + "inventory": self.inventory, + "macros": self.usermacros, + "tags": self.tags, + } # If a Zabbix proxy or Zabbix Proxy group has been defined if self.zbxproxy: # If a lower version than 7 is used, we can assume that # the proxy is a normal proxy and not a proxy group - if not str(self.zabbix.version).startswith('7'): + if not str(self.zabbix.version).startswith("7"): create_data["proxy_hostid"] = self.zbxproxy["id"] else: # Configure either a proxy or proxy group @@ -496,8 +566,8 @@ class PhysicalDevice(): """ final_data = [] # Check if the hostgroup is in a nested format and check each parent - for pos in range(len(self.hostgroup.split('/'))): - zabbix_hg = self.hostgroup.rsplit('/', pos)[0] + for pos in range(len(self.hostgroup.split("/"))): + zabbix_hg = self.hostgroup.rsplit("/", pos)[0] if self.lookupZabbixHostgroup(hostgroups, zabbix_hg): # Hostgroup already exists continue @@ -508,7 +578,9 @@ class PhysicalDevice(): e = f"Hostgroup '{zabbix_hg}': created in Zabbix." self.logger.info(e) # Add group to final data - final_data.append({'groupid': groupid["groupids"][0], 'name': zabbix_hg}) + final_data.append( + {"groupid": groupid["groupids"][0], "name": zabbix_hg} + ) except APIRequestError as e: msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}." self.logger.error(msg) @@ -535,20 +607,24 @@ class PhysicalDevice(): try: self.zabbix.host.update(hostid=self.zabbix_id, **kwargs) except APIRequestError as e: - e = (f"Host {self.name}: Unable to update. " - f"Zabbix returned the following error: {str(e)}.") + e = ( + f"Host {self.name}: Unable to update. " + f"Zabbix returned the following error: {str(e)}." + ) self.logger.error(e) raise SyncExternalError(e) from None self.logger.info(f"Updated host {self.name} with data {kwargs}.") self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") - def ConsistencyCheck(self, groups, templates, proxies, proxy_power, create_hostgroups): + def ConsistencyCheck( + self, groups, templates, proxies, proxy_power, create_hostgroups + ): # pylint: disable=too-many-branches, too-many-statements """ Checks if Zabbix object is still valid with NetBox parameters. """ # If group is found or if the hostgroup is nested - if not self.setZabbixGroupID(groups) or len(self.hostgroup.split('/')) > 1: + if not self.setZabbixGroupID(groups) or len(self.hostgroup.split("/")) > 1: if create_hostgroups: # Script is allowed to create a new hostgroup new_groups = self.createZabbixHostgroup(groups) @@ -559,50 +635,59 @@ class PhysicalDevice(): if not self.group_id: # Function returns true / false but also sets GroupID if not self.setZabbixGroupID(groups) and not create_hostgroups: - e = (f"Host {self.name}: different hostgroup is required but " - "unable to create hostgroup without generation permission.") + e = ( + f"Host {self.name}: different hostgroup is required but " + "unable to create hostgroup without generation permission." + ) self.logger.warning(e) raise SyncInventoryError(e) # Prepare templates and proxy config self.zbxTemplatePrepper(templates) self.setProxy(proxies) # Get host object from Zabbix - host = self.zabbix.host.get(filter={'hostid': self.zabbix_id}, - selectInterfaces=['type', 'ip', - 'port', 'details', - 'interfaceid'], - selectGroups=["groupid"], - selectHostGroups=["groupid"], - selectParentTemplates=["templateid"], - selectInventory=list(self._inventory_map().values()), - selectMacros=["macro","value","type","description"], - selectTags=["tag","value"] - ) + host = self.zabbix.host.get( + filter={"hostid": self.zabbix_id}, + selectInterfaces=["type", "ip", "port", "details", "interfaceid"], + selectGroups=["groupid"], + selectHostGroups=["groupid"], + selectParentTemplates=["templateid"], + selectInventory=list(self._inventory_map().values()), + selectMacros=["macro", "value", "type", "description"], + selectTags=["tag", "value"], + ) if len(host) > 1: - e = (f"Got {len(host)} results for Zabbix hosts " - f"with ID {self.zabbix_id} - hostname {self.name}.") + e = ( + f"Got {len(host)} results for Zabbix hosts " + f"with ID {self.zabbix_id} - hostname {self.name}." + ) self.logger.error(e) raise SyncInventoryError(e) if len(host) == 0: - e = (f"Host {self.name}: No Zabbix host found. " - f"This is likely the result of a deleted Zabbix host " - f"without zeroing the ID field in NetBox.") + e = ( + f"Host {self.name}: No Zabbix host found. " + f"This is likely the result of a deleted Zabbix host " + f"without zeroing the ID field in NetBox." + ) self.logger.error(e) raise SyncInventoryError(e) host = host[0] if host["host"] == self.name: self.logger.debug(f"Host {self.name}: hostname in-sync.") else: - self.logger.warning(f"Host {self.name}: hostname OUT of sync. " - f"Received value: {host['host']}") + self.logger.warning( + f"Host {self.name}: hostname OUT of sync. " + f"Received value: {host['host']}" + ) self.updateZabbixHost(host=self.name) # Execute check depending on wether the name is special or not if self.use_visible_name: if host["name"] == self.visible_name: self.logger.debug(f"Host {self.name}: visible name in-sync.") else: - self.logger.warning(f"Host {self.name}: visible name OUT of sync." - f" Received value: {host['name']}") + self.logger.warning( + f"Host {self.name}: visible name OUT of sync." + f" Received value: {host['name']}" + ) self.updateZabbixHost(name=self.visible_name) # Check if the templates are in-sync @@ -611,23 +696,24 @@ class PhysicalDevice(): # Prepare Templates for API parsing templateids = [] for template in self.zbx_templates: - templateids.append({'templateid': template['templateid']}) + templateids.append({"templateid": template["templateid"]}) # Update Zabbix with NB templates and clear any old / lost templates - self.updateZabbixHost(templates_clear=host["parentTemplates"], - templates=templateids) + self.updateZabbixHost( + templates_clear=host["parentTemplates"], templates=templateids + ) else: self.logger.debug(f"Host {self.name}: template(s) in-sync.") # Check if Zabbix version is 6 or higher. Issue #93 group_dictname = "hostgroups" - if str(self.zabbix.version).startswith(('6', '5')): + if str(self.zabbix.version).startswith(("6", "5")): group_dictname = "groups" for group in host[group_dictname]: if group["groupid"] == self.group_id: self.logger.debug(f"Host {self.name}: hostgroup in-sync.") break self.logger.warning(f"Host {self.name}: hostgroup OUT of sync.") - self.updateZabbixHost(groups={'groupid': self.group_id}) + self.updateZabbixHost(groups={"groupid": self.group_id}) if int(host["status"]) == self.zabbix_state: self.logger.debug(f"Host {self.name}: status in-sync.") @@ -637,8 +723,10 @@ class PhysicalDevice(): # Check if a proxy has been defined if self.zbxproxy: # Check if proxy or proxy group is defined - if (self.zbxproxy["idtype"] in host and - host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]): + if ( + self.zbxproxy["idtype"] in host + and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"] + ): self.logger.debug(f"Host {self.name}: proxy in-sync.") # Backwards compatibility for Zabbix <= 6 elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]: @@ -647,13 +735,15 @@ class PhysicalDevice(): else: self.logger.warning(f"Host {self.name}: proxy OUT of sync.") # Zabbix <= 6 patch - if not str(self.zabbix.version).startswith('7'): - self.updateZabbixHost(proxy_hostid=self.zbxproxy['id']) + if not str(self.zabbix.version).startswith("7"): + self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"]) # Zabbix 7+ else: # Prepare data structure for updating either proxy or group - update_data = {self.zbxproxy["idtype"]: self.zbxproxy["id"], - "monitored_by": self.zbxproxy['monitored_by']} + update_data = { + self.zbxproxy["idtype"]: self.zbxproxy["id"], + "monitored_by": self.zbxproxy["monitored_by"], + } self.updateZabbixHost(**update_data) else: # No proxy is defined in NetBox @@ -665,8 +755,10 @@ class PhysicalDevice(): proxy_set = True if proxy_power and proxy_set: # Zabbix <= 6 fix - self.logger.warning(f"Host {self.name}: no proxy is configured in NetBox " - "but is configured in Zabbix. Removing proxy config in Zabbix") + self.logger.warning( + f"Host {self.name}: no proxy is configured in NetBox " + "but is configured in Zabbix. Removing proxy config in Zabbix" + ) if "proxy_hostid" in host and bool(host["proxy_hostid"]): self.updateZabbixHost(proxy_hostid=0) # Zabbix 7 proxy @@ -678,21 +770,23 @@ class PhysicalDevice(): # Checks if a proxy has been defined in Zabbix and if proxy_power config has been set if proxy_set and not proxy_power: # Display error message - self.logger.error(f"Host {self.name} is configured " - f"with proxy in Zabbix but not in NetBox. The" - " -p flag was ommited: no " - "changes have been made.") + self.logger.error( + f"Host {self.name} is configured " + f"with proxy in Zabbix but not in NetBox. The" + " -p flag was ommited: no " + "changes have been made." + ) if not proxy_set: self.logger.debug(f"Host {self.name}: proxy in-sync.") # Check host inventory mode - if str(host['inventory_mode']) == str(self.inventory_mode): + if str(host["inventory_mode"]) == str(self.inventory_mode): self.logger.debug(f"Host {self.name}: inventory_mode in-sync.") else: self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.") self.updateZabbixHost(inventory_mode=str(self.inventory_mode)) - if inventory_sync and self.inventory_mode in [0,1]: + if inventory_sync and self.inventory_mode in [0, 1]: # Check host inventory mapping - if host['inventory'] == self.inventory: + if host["inventory"] == self.inventory: self.logger.debug(f"Host {self.name}: inventory in-sync.") else: self.logger.warning(f"Host {self.name}: inventory OUT of sync.") @@ -704,12 +798,12 @@ class PhysicalDevice(): # Do not re-sync secret usermacros unless sync is set to 'full' if str(usermacro_sync).lower() != "full": for m in deepcopy(self.usermacros): - if m['type'] == str(1): + if m["type"] == str(1): # Remove the value as the api doesn't return it # this will allow us to only update usermacros that don't exist - m.pop('value') + m.pop("value") macros_filtered.append(m) - if host['macros'] == self.usermacros or host['macros'] == macros_filtered: + if host["macros"] == self.usermacros or host["macros"] == macros_filtered: self.logger.debug(f"Host {self.name}: usermacros in-sync.") else: self.logger.warning(f"Host {self.name}: usermacros OUT of sync.") @@ -717,7 +811,7 @@ class PhysicalDevice(): # Check host usermacros if tag_sync: - if remove_duplicates(host['tags'],sortkey='tag') == self.tags: + if remove_duplicates(host["tags"], sortkey="tag") == self.tags: self.logger.debug(f"Host {self.name}: tags in-sync.") else: self.logger.warning(f"Host {self.name}: tags OUT of sync.") @@ -725,7 +819,7 @@ class PhysicalDevice(): # If only 1 interface has been found # pylint: disable=too-many-nested-blocks - if len(host['interfaces']) == 1: + if len(host["interfaces"]) == 1: updates = {} # Go through each key / item and check if it matches Zabbix for key, item in self.setInterfaceDetails()[0].items(): @@ -733,7 +827,7 @@ class PhysicalDevice(): if key in host["interfaces"][0]: # If SNMP is used, go through nested dict # to compare SNMP parameters - if isinstance(item,dict) and key == "details": + if isinstance(item, dict) and key == "details": for k, i in item.items(): if k in host["interfaces"][0][key]: # Set update if values don't match @@ -761,12 +855,14 @@ class PhysicalDevice(): self.logger.warning(f"Host {self.name}: Interface OUT of sync.") if "type" in updates: # Changing interface type not supported. Raise exception. - e = (f"Host {self.name}: changing interface type to " - f"{str(updates['type'])} is not supported.") + e = ( + f"Host {self.name}: changing interface type to " + f"{str(updates['type'])} is not supported." + ) self.logger.error(e) raise InterfaceConfigError(e) # Set interfaceID for Zabbix config - updates["interfaceid"] = host["interfaces"][0]['interfaceid'] + updates["interfaceid"] = host["interfaces"][0]["interfaceid"] try: # API call to Zabbix self.zabbix.hostinterface.update(updates) @@ -782,9 +878,11 @@ class PhysicalDevice(): e = f"Host {self.name}: interface in-sync." self.logger.debug(e) else: - e = (f"Host {self.name} has unsupported interface configuration." - f" Host has total of {len(host['interfaces'])} interfaces. " - "Manual interfention required.") + e = ( + f"Host {self.name} has unsupported interface configuration." + f" Host has total of {len(host['interfaces'])} interfaces. " + "Manual interfention required." + ) self.logger.error(e) raise SyncInventoryError(e) @@ -796,20 +894,25 @@ class PhysicalDevice(): if self.journal: # Check if the severity is valid if severity not in ["info", "success", "warning", "danger"]: - self.logger.warning(f"Value {severity} not valid for NB journal entries.") + self.logger.warning( + f"Value {severity} not valid for NB journal entries." + ) return False - journal = {"assigned_object_type": "dcim.device", - "assigned_object_id": self.id, - "kind": severity, - "comments": message - } + journal = { + "assigned_object_type": "dcim.device", + "assigned_object_id": self.id, + "kind": severity, + "comments": message, + } try: self.nb_journals.create(journal) self.logger.debug(f"Host {self.name}: Created journal entry in NetBox") return True except JournalError(e) as e: - self.logger.warning("Unable to create journal entry for " - f"{self.name}: NB returned {e}") + self.logger.warning( + "Unable to create journal entry for " + f"{self.name}: NB returned {e}" + ) return False return False @@ -832,10 +935,15 @@ class PhysicalDevice(): # and add this NB template to the list of successfull templates tmpls_from_zabbix.pop(pos) succesfull_templates.append(nb_tmpl) - self.logger.debug(f"Host {self.name}: template " - f"{nb_tmpl['name']} is present in Zabbix.") + self.logger.debug( + f"Host {self.name}: template " + f"{nb_tmpl['name']} is present in Zabbix." + ) break - if len(succesfull_templates) == len(self.zbx_templates) and len(tmpls_from_zabbix) == 0: + if ( + len(succesfull_templates) == len(self.zbx_templates) + and len(tmpls_from_zabbix) == 0 + ): # All of the NetBox templates have been confirmed as successfull # and the ZBX template list is empty. This means that # all of the templates match. diff --git a/modules/exceptions.py b/modules/exceptions.py index 27a141c..ddac2b0 100644 --- a/modules/exceptions.py +++ b/modules/exceptions.py @@ -2,35 +2,47 @@ """ All custom exceptions used for Exception generation """ + + class SyncError(Exception): - """ Class SyncError """ + """Class SyncError""" + class JournalError(Exception): - """ Class SyncError """ + """Class SyncError""" + class SyncExternalError(SyncError): - """ Class SyncExternalError """ + """Class SyncExternalError""" + class SyncInventoryError(SyncError): - """ Class SyncInventoryError """ + """Class SyncInventoryError""" + class SyncDuplicateError(SyncError): - """ Class SyncDuplicateError """ + """Class SyncDuplicateError""" + class EnvironmentVarError(SyncError): - """ Class EnvironmentVarError """ + """Class EnvironmentVarError""" + class InterfaceConfigError(SyncError): - """ Class InterfaceConfigError """ + """Class InterfaceConfigError""" + class ProxyConfigError(SyncError): - """ Class ProxyConfigError """ + """Class ProxyConfigError""" + class HostgroupError(SyncError): - """ Class HostgroupError """ + """Class HostgroupError""" + class TemplateError(SyncError): - """ Class TemplateError """ + """Class TemplateError""" + class UsermacroError(SyncError): - """ Class UsermacroError """ + """Class UsermacroError""" diff --git a/modules/hostgroups.py b/modules/hostgroups.py index 6e2db75..c67f5e6 100644 --- a/modules/hostgroups.py +++ b/modules/hostgroups.py @@ -1,14 +1,26 @@ """Module for all hostgroup related code""" + from logging import getLogger + from modules.exceptions import HostgroupError from modules.tools import build_path -class Hostgroup(): + +class Hostgroup: """Hostgroup class for devices and VM's Takes type (vm or dev) and NB object""" - def __init__(self, obj_type, nb_obj, version, logger=None, #pylint: disable=too-many-arguments, too-many-positional-arguments - nested_sitegroup_flag=False, nested_region_flag=False, - nb_regions=None, nb_groups=None): + + def __init__( + self, + obj_type, + nb_obj, + version, + logger=None, # pylint: disable=too-many-arguments, too-many-positional-arguments + nested_sitegroup_flag=False, + nested_region_flag=False, + nb_regions=None, + nb_groups=None, + ): self.logger = logger if logger else getLogger(__name__) if obj_type not in ("vm", "dev"): msg = f"Unable to create hostgroup with type {type}" @@ -19,8 +31,9 @@ class Hostgroup(): self.name = self.nb.name self.nb_version = version # Used for nested data objects - self.set_nesting(nested_sitegroup_flag, nested_region_flag, - nb_groups, nb_regions) + self.set_nesting( + nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions + ) self._set_format_options() def __str__(self): @@ -49,20 +62,28 @@ class Hostgroup(): format_options["site_group"] = None if self.nb.site: if self.nb.site.region: - format_options["region"] = self.generate_parents("region", - str(self.nb.site.region)) + format_options["region"] = self.generate_parents( + "region", str(self.nb.site.region) + ) if self.nb.site.group: - format_options["site_group"] = self.generate_parents("site_group", - str(self.nb.site.group)) + format_options["site_group"] = self.generate_parents( + "site_group", str(self.nb.site.group) + ) format_options["role"] = role format_options["site"] = self.nb.site.name if self.nb.site else None format_options["tenant"] = str(self.nb.tenant) if self.nb.tenant else None - format_options["tenant_group"] = str(self.nb.tenant.group) if self.nb.tenant else None - format_options["platform"] = self.nb.platform.name if self.nb.platform else None + format_options["tenant_group"] = ( + str(self.nb.tenant.group) if self.nb.tenant else None + ) + format_options["platform"] = ( + self.nb.platform.name if self.nb.platform else None + ) # Variables only applicable for devices if self.type == "dev": format_options["manufacturer"] = self.nb.device_type.manufacturer.name - format_options["location"] = str(self.nb.location) if self.nb.location else None + format_options["location"] = ( + str(self.nb.location) if self.nb.location else None + ) # Variables only applicable for VM's if self.type == "vm": # Check if a cluster is configured. Could also be configured in a site. @@ -72,17 +93,22 @@ class Hostgroup(): self.format_options = format_options - def set_nesting(self, nested_sitegroup_flag, nested_region_flag, - nb_groups, nb_regions): + def set_nesting( + self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions + ): """Set nesting options for this Hostgroup""" - self.nested_objects = {"site_group": {"flag": nested_sitegroup_flag, "data": nb_groups}, - "region": {"flag": nested_region_flag, "data": nb_regions}} + self.nested_objects = { + "site_group": {"flag": nested_sitegroup_flag, "data": nb_groups}, + "region": {"flag": nested_region_flag, "data": nb_regions}, + } def generate(self, hg_format=None): """Generate hostgroup based on a provided format""" # Set format to default in case its not specified if not hg_format: - hg_format = "site/manufacturer/role" if self.type == "dev" else "cluster/role" + hg_format = ( + "site/manufacturer/role" if self.type == "dev" else "cluster/role" + ) # Split all given names hg_output = [] hg_items = hg_format.split("/") @@ -93,8 +119,10 @@ class Hostgroup(): cf_data = self.custom_field_lookup(hg_item) # CF does not exist if not cf_data["result"]: - msg = (f"Unable to generate hostgroup for host {self.name}. " - f"Item type {hg_item} not supported.") + msg = ( + f"Unable to generate hostgroup for host {self.name}. " + f"Item type {hg_item} not supported." + ) self.logger.error(msg) raise HostgroupError(msg) # CF data is populated @@ -109,10 +137,12 @@ class Hostgroup(): # Check if the hostgroup is populated with at least one item. if bool(hg_output): return "/".join(hg_output) - msg = (f"Unable to generate hostgroup for host {self.name}." - " Not enough valid items. This is most likely" - " due to the use of custom fields that are empty" - " or an invalid hostgroup format.") + msg = ( + f"Unable to generate hostgroup for host {self.name}." + " Not enough valid items. This is most likely" + " due to the use of custom fields that are empty" + " or an invalid hostgroup format." + ) self.logger.error(msg) raise HostgroupError(msg) @@ -157,7 +187,9 @@ class Hostgroup(): return child_object # If the nested flag is True, perform parent calculation if self.nested_objects[nest_type]["flag"]: - final_nested_object = build_path(child_object, self.nested_objects[nest_type]["data"]) + final_nested_object = build_path( + child_object, self.nested_objects[nest_type]["data"] + ) return "/".join(final_nested_object) # Nesting is not allowed for this object. Return child_object return child_object diff --git a/modules/interface.py b/modules/interface.py index e4413c6..1bd1e37 100644 --- a/modules/interface.py +++ b/modules/interface.py @@ -4,7 +4,8 @@ All of the Zabbix interface related configuration """ from modules.exceptions import InterfaceConfigError -class ZabbixInterface(): + +class ZabbixInterface: """Class that represents a Zabbix interface.""" def __init__(self, context, ip): @@ -15,21 +16,16 @@ class ZabbixInterface(): def _set_default_port(self): """Sets default TCP / UDP port for different interface types""" - interface_mapping = { - 1: 10050, - 2: 161, - 3: 623, - 4: 12345 - } + interface_mapping = {1: 10050, 2: 161, 3: 623, 4: 12345} # Check if interface type is listed in mapper. - if self.interface['type'] not in interface_mapping: + if self.interface["type"] not in interface_mapping: return False # Set default port to interface - self.interface["port"] = str(interface_mapping[self.interface['type']]) + self.interface["port"] = str(interface_mapping[self.interface["type"]]) return True def get_context(self): - """ check if NetBox custom context has been defined. """ + """check if NetBox custom context has been defined.""" if "zabbix" in self.context: zabbix = self.context["zabbix"] if "interface_type" in zabbix: @@ -43,7 +39,7 @@ class ZabbixInterface(): return False def set_snmp(self): - """ Check if interface is type SNMP """ + """Check if interface is type SNMP""" # pylint: disable=too-many-branches if self.interface["type"] == 2: # Checks if SNMP settings are defined in NetBox @@ -63,7 +59,7 @@ class ZabbixInterface(): e = "SNMP version option is not defined." raise InterfaceConfigError(e) # If version 1 or 2 is used, get community string - if self.interface["details"]["version"] in ['1','2']: + if self.interface["details"]["version"] in ["1", "2"]: if "community" in snmp: # Set SNMP community to confix context value community = snmp["community"] @@ -73,10 +69,16 @@ class ZabbixInterface(): self.interface["details"]["community"] = str(community) # If version 3 has been used, get all # SNMPv3 NetBox related configs - elif self.interface["details"]["version"] == '3': - items = ["securityname", "securitylevel", "authpassphrase", - "privpassphrase", "authprotocol", "privprotocol", - "contextname"] + elif self.interface["details"]["version"] == "3": + items = [ + "securityname", + "securitylevel", + "authpassphrase", + "privpassphrase", + "authprotocol", + "privprotocol", + "contextname", + ] for key, item in snmp.items(): if key in items: self.interface["details"][key] = str(item) @@ -91,13 +93,15 @@ class ZabbixInterface(): raise InterfaceConfigError(e) def set_default_snmp(self): - """ Set default config to SNMPv2, port 161 and community macro. """ + """Set default config to SNMPv2, port 161 and community macro.""" self.interface = self.skelet self.interface["type"] = "2" self.interface["port"] = "161" - self.interface["details"] = {"version": "2", - "community": "{$SNMP_COMMUNITY}", - "bulk": "1"} + self.interface["details"] = { + "version": "2", + "community": "{$SNMP_COMMUNITY}", + "bulk": "1", + } def set_default_agent(self): """Sets interface to Zabbix agent defaults""" diff --git a/modules/tags.py b/modules/tags.py index 4993cd3..9dda995 100644 --- a/modules/tags.py +++ b/modules/tags.py @@ -4,13 +4,24 @@ All of the Zabbix Usermacro related configuration """ from logging import getLogger + from modules.tools import field_mapper, remove_duplicates -class ZabbixTags(): + +class ZabbixTags: """Class that represents a Zabbix interface.""" - def __init__(self, nb, tag_map, tag_sync, tag_lower=True, - tag_name=None, tag_value=None, logger=None, host=None): + def __init__( + self, + nb, + tag_map, + tag_sync, + tag_lower=True, + tag_name=None, + tag_value=None, + logger=None, + host=None, + ): self.nb = nb self.name = host if host else nb.name self.tag_map = tag_map @@ -42,7 +53,7 @@ class ZabbixTags(): """ Validates tag name """ - if tag_name and isinstance(tag_name, str) and len(tag_name)<=256: + if tag_name and isinstance(tag_name, str) and len(tag_name) <= 256: return True return False @@ -50,7 +61,7 @@ class ZabbixTags(): """ Validates tag value """ - if tag_value and isinstance(tag_value, str) and len(tag_value)<=256: + if tag_value and isinstance(tag_value, str) and len(tag_value) <= 256: return True return False @@ -58,23 +69,25 @@ class ZabbixTags(): """ Renders a tag """ - tag={} + tag = {} if self.validate_tag(tag_name): if self.lower: - tag['tag'] = tag_name.lower() + tag["tag"] = tag_name.lower() else: - tag['tag'] = tag_name + tag["tag"] = tag_name else: - self.logger.error(f'Tag {tag_name} is not a valid tag name, skipping.') + self.logger.error(f"Tag {tag_name} is not a valid tag name, skipping.") return False if self.validate_value(tag_value): if self.lower: - tag['value'] = tag_value.lower() + tag["value"] = tag_value.lower() else: - tag['value'] = tag_value + tag["value"] = tag_value else: - self.logger.error(f'Tag {tag_name} has an invalid value: \'{tag_value}\', skipping.') + self.logger.error( + f"Tag {tag_name} has an invalid value: '{tag_value}', skipping." + ) return False return tag @@ -83,7 +96,7 @@ class ZabbixTags(): Generate full set of Usermacros """ # pylint: disable=too-many-branches - tags=[] + tags = [] # Parse the field mapper for tags if self.tag_map: self.logger.debug(f"Host {self.nb.name}: Starting tag mapper") @@ -94,9 +107,12 @@ class ZabbixTags(): tags.append(t) # Parse NetBox config context for tags - if ("zabbix" in self.nb.config_context and "tags" in self.nb.config_context['zabbix'] - and isinstance(self.nb.config_context['zabbix']['tags'], list)): - for tag in self.nb.config_context['zabbix']['tags']: + if ( + "zabbix" in self.nb.config_context + and "tags" in self.nb.config_context["zabbix"] + and isinstance(self.nb.config_context["zabbix"]["tags"], list) + ): + for tag in self.nb.config_context["zabbix"]["tags"]: if isinstance(tag, dict): for tagname, value in tag.items(): t = self.render_tag(tagname, value) @@ -106,12 +122,12 @@ class ZabbixTags(): # Pull in NetBox device tags if tag_name is set if self.tag_name and isinstance(self.tag_name, str): for tag in self.nb.tags: - if self.tag_value.lower() in ['display', 'name', 'slug']: + if self.tag_value.lower() in ["display", "name", "slug"]: value = tag[self.tag_value] else: - value = tag['name'] + value = tag["name"] t = self.render_tag(self.tag_name, value) if t: tags.append(t) - return remove_duplicates(tags, sortkey='tag') + return remove_duplicates(tags, sortkey="tag") diff --git a/modules/tools.py b/modules/tools.py index 8d658a3..791025d 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -1,12 +1,14 @@ """A collection of tools used by several classes""" + def convert_recordset(recordset): - """ Converts netbox RedcordSet to list of dicts. """ + """Converts netbox RedcordSet to list of dicts.""" recordlist = [] for record in recordset: recordlist.append(record.__dict__) return recordlist + def build_path(endpoint, list_of_dicts): """ Builds a path list of related parent/child items. @@ -14,16 +16,17 @@ def build_path(endpoint, list_of_dicts): be used in hostgroups. """ item_path = [] - itemlist = [i for i in list_of_dicts if i['name'] == endpoint] + itemlist = [i for i in list_of_dicts if i["name"] == endpoint] item = itemlist[0] if len(itemlist) == 1 else None - item_path.append(item['name']) - while item['_depth'] > 0: - itemlist = [i for i in list_of_dicts if i['name'] == str(item['parent'])] + item_path.append(item["name"]) + while item["_depth"] > 0: + itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])] item = itemlist[0] if len(itemlist) == 1 else None - item_path.append(item['name']) + item_path.append(item["name"]) item_path.reverse() return item_path + def proxy_prepper(proxy_list, proxy_group_list): """ Function that takes 2 lists and converts them using a @@ -44,15 +47,16 @@ def proxy_prepper(proxy_list, proxy_group_list): output.append(group) return output + def field_mapper(host, mapper, nbdevice, logger): """ Maps NetBox field data to Zabbix properties. Used for Inventory, Usermacros and Tag mappings. """ - data={} + data = {} # Let's build an dict for each property in the map for nb_field, zbx_field in mapper.items(): - field_list = nb_field.split("/") # convert str to list based on delimiter + field_list = nb_field.split("/") # convert str to list based on delimiter # start at the base of the dict... value = nbdevice # ... and step through the dict till we find the needed value @@ -61,22 +65,30 @@ def field_mapper(host, mapper, nbdevice, logger): # Check if the result is usable and expected # We want to apply any int or float 0 values, # even if python thinks those are empty. - if ((value and isinstance(value, int | float | str )) or - (isinstance(value, int | float) and int(value) ==0)): + if (value and isinstance(value, int | float | str)) or ( + isinstance(value, int | float) and int(value) == 0 + ): data[zbx_field] = str(value) elif not value: # empty value should just be an empty string for API compatibility - logger.debug(f"Host {host}: NetBox lookup for " - f"'{nb_field}' returned an empty value") + logger.debug( + f"Host {host}: NetBox lookup for " + f"'{nb_field}' returned an empty value" + ) data[zbx_field] = "" else: # Value is not a string or numeral, probably not what the user expected. - logger.error(f"Host {host}: Lookup for '{nb_field}'" - " returned an unexpected type: it will be skipped.") - logger.debug(f"Host {host}: Field mapping complete. " - f"Mapped {len(list(filter(None, data.values())))} field(s)") + logger.error( + f"Host {host}: Lookup for '{nb_field}'" + " returned an unexpected type: it will be skipped." + ) + logger.debug( + f"Host {host}: Field mapping complete. " + f"Mapped {len(list(filter(None, data.values())))} field(s)" + ) return data + def remove_duplicates(input_list, sortkey=None): """ Removes duplicate entries from a list and sorts the list diff --git a/modules/usermacros.py b/modules/usermacros.py index 71efbde..29580d1 100644 --- a/modules/usermacros.py +++ b/modules/usermacros.py @@ -3,11 +3,13 @@ """ All of the Zabbix Usermacro related configuration """ -from re import match from logging import getLogger +from re import match + from modules.tools import field_mapper -class ZabbixUsermacros(): + +class ZabbixUsermacros: """Class that represents a Zabbix interface.""" def __init__(self, nb, usermacro_map, usermacro_sync, logger=None, host=None): @@ -42,40 +44,46 @@ class ZabbixUsermacros(): """ Validates usermacro name """ - pattern = r'\{\$[A-Z0-9\._]*(\:.*)?\}' + pattern = r"\{\$[A-Z0-9\._]*(\:.*)?\}" return match(pattern, macro_name) def render_macro(self, macro_name, macro_properties): """ Renders a full usermacro from partial input """ - macro={} - macrotypes={'text': 0, 'secret': 1, 'vault': 2} + macro = {} + macrotypes = {"text": 0, "secret": 1, "vault": 2} if self.validate_macro(macro_name): - macro['macro'] = str(macro_name) + macro["macro"] = str(macro_name) if isinstance(macro_properties, dict): - if not 'value' in macro_properties: - self.logger.error(f'Usermacro {macro_name} has no value, skipping.') + if not "value" in macro_properties: + self.logger.error(f"Usermacro {macro_name} has no value, skipping.") return False - macro['value'] = macro_properties['value'] + macro["value"] = macro_properties["value"] - if 'type' in macro_properties and macro_properties['type'].lower() in macrotypes: - macro['type'] = str(macrotypes[macro_properties['type']]) + if ( + "type" in macro_properties + and macro_properties["type"].lower() in macrotypes + ): + macro["type"] = str(macrotypes[macro_properties["type"]]) else: - macro['type'] = str(0) + macro["type"] = str(0) - if ('description' in macro_properties and - isinstance(macro_properties['description'], str)): - macro['description'] = macro_properties['description'] + if "description" in macro_properties and isinstance( + macro_properties["description"], str + ): + macro["description"] = macro_properties["description"] else: - macro['description'] = "" + macro["description"] = "" elif isinstance(macro_properties, str): - macro['value'] = macro_properties - macro['type'] = str(0) - macro['description'] = "" + macro["value"] = macro_properties + macro["type"] = str(0) + macro["description"] = "" else: - self.logger.error(f'Usermacro {macro_name} is not a valid usermacro name, skipping.') + self.logger.error( + f"Usermacro {macro_name} is not a valid usermacro name, skipping." + ) return False return macro @@ -83,18 +91,25 @@ class ZabbixUsermacros(): """ Generate full set of Usermacros """ - macros=[] + macros = [] # Parse the field mapper for usermacros if self.usermacro_map: self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper") - field_macros = field_mapper(self.nb.name, self.usermacro_map, self.nb, self.logger) + field_macros = field_mapper( + self.nb.name, self.usermacro_map, self.nb, self.logger + ) for macro, value in field_macros.items(): m = self.render_macro(macro, value) if m: macros.append(m) # Parse NetBox config context for usermacros - if "zabbix" in self.nb.config_context and "usermacros" in self.nb.config_context['zabbix']: - for macro, properties in self.nb.config_context['zabbix']['usermacros'].items(): + if ( + "zabbix" in self.nb.config_context + and "usermacros" in self.nb.config_context["zabbix"] + ): + for macro, properties in self.nb.config_context["zabbix"][ + "usermacros" + ].items(): m = self.render_macro(macro, properties) if m: macros.append(m) diff --git a/modules/virtual_machine.py b/modules/virtual_machine.py index 273f9e7..6038811 100644 --- a/modules/virtual_machine.py +++ b/modules/virtual_machine.py @@ -3,55 +3,66 @@ """Module that hosts all functions for virtual machine processing""" from os import sys + from modules.device import PhysicalDevice +from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError from modules.hostgroups import Hostgroup from modules.interface import ZabbixInterface -from modules.exceptions import TemplateError, InterfaceConfigError, SyncInventoryError + try: from config import ( - vm_inventory_map, - vm_usermacro_map, - vm_tag_map, + traverse_regions, traverse_site_groups, - traverse_regions + vm_inventory_map, + vm_tag_map, + vm_usermacro_map, ) except ModuleNotFoundError: - print("Configuration file config.py not found in main directory." - "Please create the file or rename the config.py.example file to config.py.") + print( + "Configuration file config.py not found in main directory." + "Please create the file or rename the config.py.example file to config.py." + ) sys.exit(0) + class VirtualMachine(PhysicalDevice): """Model for virtual machines""" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.hostgroup = None self.zbx_template_names = None def _inventory_map(self): - """ use VM inventory maps """ + """use VM inventory maps""" return vm_inventory_map def _usermacro_map(self): - """ use VM usermacro maps """ + """use VM usermacro maps""" return vm_usermacro_map def _tag_map(self): - """ use VM tag maps """ + """use VM tag maps""" return vm_tag_map def set_hostgroup(self, hg_format, nb_site_groups, nb_regions): """Set the hostgroup for this device""" # Create new Hostgroup instance - hg = Hostgroup("vm", self.nb, self.nb_api_version, logger=self.logger, - nested_sitegroup_flag=traverse_site_groups, - nested_region_flag=traverse_regions, - nb_groups=nb_site_groups, - nb_regions=nb_regions) + hg = Hostgroup( + "vm", + self.nb, + self.nb_api_version, + logger=self.logger, + nested_sitegroup_flag=traverse_site_groups, + nested_region_flag=traverse_regions, + nb_groups=nb_site_groups, + nb_regions=nb_regions, + ) # Generate hostgroup based on hostgroup format self.hostgroup = hg.generate(hg_format) def set_vm_template(self): - """ Set Template for VMs. Overwrites default class + """Set Template for VMs. Overwrites default class to skip a lookup of custom fields.""" # Gather templates ONLY from the device specific context try: @@ -60,7 +71,7 @@ class VirtualMachine(PhysicalDevice): self.logger.warning(e) return True - def setInterfaceDetails(self): # pylint: disable=invalid-name + def setInterfaceDetails(self): # pylint: disable=invalid-name """ Overwrites device function to select an agent interface type by default Agent type interfaces are more likely to be used with VMs then SNMP