diff --git a/modules/device.py b/modules/device.py index 61a11c6..dcae69c 100644 --- a/modules/device.py +++ b/modules/device.py @@ -26,6 +26,7 @@ from modules.config import load_config config = load_config() + class PhysicalDevice: # pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments """ @@ -97,7 +98,7 @@ class PhysicalDevice: if config["device_cf"] in self.nb.custom_fields: self.zabbix_id = self.nb.custom_fields[config["device_cf"]] else: - e = f'Host {self.name}: Custom field {config["device_cf"]} not present' + e = f"Host {self.name}: Custom field {config['device_cf']} not present" self.logger.error(e) raise SyncInventoryError(e) @@ -111,9 +112,10 @@ class PhysicalDevice: self.visible_name = self.nb.name self.use_visible_name = True self.logger.info( - f"Host {self.visible_name} contains special characters. " - f"Using {self.name} as name for the NetBox object " - f"and using {self.visible_name} as visible name in Zabbix." + "Host %s contains special characters. Using %s as name for the NetBox object and using %s as visible name in Zabbix.", + self.visible_name, + self.name, + self.visible_name, ) else: pass @@ -126,8 +128,8 @@ class PhysicalDevice: self.nb, self.nb_api_version, logger=self.logger, - nested_sitegroup_flag=config['traverse_site_groups'], - nested_region_flag=config['traverse_regions'], + nested_sitegroup_flag=config["traverse_site_groups"], + nested_region_flag=config["traverse_regions"], nb_groups=nb_site_groups, nb_regions=nb_regions, ) @@ -139,12 +141,12 @@ class PhysicalDevice: # Remove duplicates and None values self.hostgroups = list(filter(None, list(set(self.hostgroups)))) if self.hostgroups: - self.logger.debug(f"Host {self.name}: Should be member " - f"of groups: {self.hostgroups}") + self.logger.debug( + "Host %s: Should be member of groups: %s", self.name, self.hostgroups + ) return True return False - def set_template(self, prefer_config_context, overrule_custom): """Set Template""" self.zbx_template_names = None @@ -210,9 +212,10 @@ class PhysicalDevice: # Set inventory mode. Default is disabled (see class init function). if config["inventory_mode"] == "disabled": if config["inventory_sync"]: - self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " - "Inventory sync is enabled in " - "config but inventory mode is disabled.") + self.logger.error( + "Host %s: Unable to map NetBox inventory to Zabbix. Inventory sync is enabled in config but inventory mode is disabled", + self.name, + ) return True if config["inventory_mode"] == "manual": self.inventory_mode = 0 @@ -220,17 +223,20 @@ class PhysicalDevice: self.inventory_mode = 1 else: self.logger.error( - f"Host {self.name}: Specified value for inventory mode in" - f" config is not valid. Got value {config['inventory_mode']}" + "Host %s: Specified value for inventory mode in config is not valid. Got value %s", + self.name, + config["inventory_mode"], ) return False self.inventory = {} if config["inventory_sync"] and self.inventory_mode in [0, 1]: - self.logger.debug(f"Host {self.name}: Starting inventory mapper.") + self.logger.debug("Host %s: Starting inventory mapper.", self.name) self.inventory = field_mapper( self.name, self._inventory_map(), nbdevice, self.logger ) - self.logger.debug(f"Host {self.name}: Resolved inventory: {self.inventory}") + self.logger.debug( + "Host %s: Resolved inventory: %s", self.name, self.inventory + ) return True def isCluster(self): @@ -268,13 +274,14 @@ class PhysicalDevice: masterid = self.getClusterMaster() if masterid == self.id: self.logger.info( - f"Host {self.name} is primary cluster member. " - f"Modifying hostname from {self.name} to " - + f"{self.nb.virtual_chassis.name}." + "Host %s is primary cluster member. Modifying hostname from %s to %s.", + self.name, + self.name, + self.nb.virtual_chassis.name, ) self.name = self.nb.virtual_chassis.name return True - self.logger.info(f"Host {self.name} is non-primary cluster member.") + self.logger.info("Host %s is non-primary cluster member.", self.name) return False def zbxTemplatePrepper(self, templates): @@ -306,8 +313,10 @@ class PhysicalDevice: "name": zbx_template["name"], } ) - e = (f"Host {self.name}: Found template '{zbx_template['name']}' " - f"(ID:{zbx_template['templateid']})") + e = ( + f"Host {self.name}: Found template '{zbx_template['name']}' " + f"(ID:{zbx_template['templateid']})" + ) self.logger.debug(e) # Return error should the template not be found in Zabbix if not template_match: @@ -331,7 +340,7 @@ class PhysicalDevice: self.group_ids.append({"groupid": group["groupid"]}) e = ( f"Host {self.name}: Matched group " - f"\"{group['name']}\" (ID:{group['groupid']})" + f'"{group["name"]}" (ID:{group["groupid"]})' ) self.logger.debug(e) if len(self.group_ids) == len(self.hostgroups): @@ -412,7 +421,7 @@ class PhysicalDevice: macros = ZabbixUsermacros( self.nb, self._usermacro_map(), - config['usermacro_sync'], + config["usermacro_sync"], logger=self.logger, host=self.name, ) @@ -430,14 +439,14 @@ class PhysicalDevice: tags = ZabbixTags( self.nb, self._tag_map(), - tag_sync=config['tag_sync'], - tag_lower=config['tag_lower'], - tag_name=config['tag_name'], - tag_value=config['tag_value'], + tag_sync=config["tag_sync"], + tag_lower=config["tag_lower"], + tag_name=config["tag_name"], + tag_value=config["tag_value"], logger=self.logger, host=self.name, ) - if config['tag_sync'] is False: + if config["tag_sync"] is False: self.tags = [] return False self.tags = tags.generate() @@ -477,12 +486,12 @@ class PhysicalDevice: # If the proxy name matches if proxy["name"] == proxy_name: self.logger.debug( - f"Host {self.name}: using {proxy['type']}" f" '{proxy_name}'" + "Host %s: using {proxy['type']} '%s'", self.name, proxy_name ) self.zbxproxy = proxy return True self.logger.warning( - f"Host {self.name}: unable to find proxy {proxy_name}" + "Host %s: unable to find proxy %s", self.name, proxy_name ) return False @@ -554,7 +563,7 @@ class PhysicalDevice: self.create_journal_entry("success", msg) else: self.logger.error( - f"Host {self.name}: Unable to add to Zabbix. Host already present." + "Host %s: Unable to add to Zabbix. Host already present.", self.name ) def createZabbixHostgroup(self, hostgroups): @@ -612,7 +621,9 @@ class PhysicalDevice: ) self.logger.error(e) raise SyncExternalError(e) from None - self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.") + self.logger.info( + "Host %s: updated with data %s.", self.name, sanatize_log_output(kwargs) + ) self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") def ConsistencyCheck( @@ -623,7 +634,7 @@ class PhysicalDevice: Checks if Zabbix object is still valid with NetBox parameters. """ # If group is found or if the hostgroup is nested - if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1: + if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1: if create_hostgroups: # Script is allowed to create a new hostgroup new_groups = self.createZabbixHostgroup(groups) @@ -672,28 +683,30 @@ class PhysicalDevice: raise SyncInventoryError(e) host = host[0] if host["host"] == self.name: - self.logger.debug(f"Host {self.name}: Hostname in-sync.") + self.logger.debug("Host %s: Hostname in-sync.", self.name) else: self.logger.info( - f"Host {self.name}: Hostname OUT of sync. " - f"Received value: {host['host']}" + "Host %s: Hostname OUT of sync. Received value: %s", + self.name, + host["host"], ) self.updateZabbixHost(host=self.name) # Execute check depending on wether the name is special or not if self.use_visible_name: if host["name"] == self.visible_name: - self.logger.debug(f"Host {self.name}: Visible name in-sync.") + self.logger.debug("Host %s: Visible name in-sync.", self.name) else: self.logger.info( - f"Host {self.name}: Visible name OUT of sync." - f" Received value: {host['name']}" + "Host %s: Visible name OUT of sync. Received value: %s", + self.name, + host["name"], ) self.updateZabbixHost(name=self.visible_name) # Check if the templates are in-sync if not self.zbx_template_comparer(host["parentTemplates"]): - self.logger.info(f"Host {self.name}: Template(s) OUT of sync.") + self.logger.info("Host %s: Template(s) OUT of sync.", self.name) # Prepare Templates for API parsing templateids = [] for template in self.zbx_templates: @@ -703,38 +716,41 @@ class PhysicalDevice: templates_clear=host["parentTemplates"], templates=templateids ) else: - self.logger.debug(f"Host {self.name}: Template(s) in-sync.") + self.logger.debug("Host %s: Template(s) in-sync.", self.name) # Check if Zabbix version is 6 or higher. Issue #93 group_dictname = "hostgroups" if str(self.zabbix.version).startswith(("6", "5")): group_dictname = "groups" # Check if hostgroups match - if (sorted(host[group_dictname], key=itemgetter('groupid')) == - sorted(self.group_ids, key=itemgetter('groupid'))): - self.logger.debug(f"Host {self.name}: Hostgroups in-sync.") + if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted( + self.group_ids, key=itemgetter("groupid") + ): + self.logger.debug("Host %s: Hostgroups in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: Hostgroups OUT of sync.") + self.logger.info("Host %s: Hostgroups OUT of sync.", self.name) self.updateZabbixHost(groups=self.group_ids) if int(host["status"]) == self.zabbix_state: - self.logger.debug(f"Host {self.name}: Status in-sync.") + self.logger.debug("Host %s: Status in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: Status OUT of sync.") + self.logger.info("Host %s: Status OUT of sync.", self.name) self.updateZabbixHost(status=str(self.zabbix_state)) # Check if a proxy has been defined if self.zbxproxy: # Check if proxy or proxy group is defined - if (self.zbxproxy["idtype"] in host and - host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]): - self.logger.debug(f"Host {self.name}: Proxy in-sync.") + if ( + self.zbxproxy["idtype"] in host + and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"] + ): + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Backwards compatibility for Zabbix <= 6 elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]: - self.logger.debug(f"Host {self.name}: Proxy in-sync.") + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Proxy does not match, update Zabbix else: - self.logger.info(f"Host {self.name}: Proxy OUT of sync.") + self.logger.info("Host %s: Proxy OUT of sync.", self.name) # Zabbix <= 6 patch if not str(self.zabbix.version).startswith("7"): self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"]) @@ -757,8 +773,8 @@ class PhysicalDevice: if proxy_power and proxy_set: # Zabbix <= 6 fix self.logger.warning( - f"Host {self.name}: No proxy is configured in NetBox " - "but is configured in Zabbix. Removing proxy config in Zabbix" + "Host %s: No proxy is configured in NetBox but is configured in Zabbix. Removing proxy config in Zabbix", + self.name, ) if "proxy_hostid" in host and bool(host["proxy_hostid"]): self.updateZabbixHost(proxy_hostid=0) @@ -772,59 +788,59 @@ class PhysicalDevice: if proxy_set and not proxy_power: # Display error message self.logger.warning( - f"Host {self.name}: Is configured " - f"with proxy in Zabbix but not in NetBox. The" - " -p flag was ommited: no " - "changes have been made." + "Host %s: Is configured with proxy in Zabbix but not in NetBox. The -p flag was ommited: no changes have been made.", + self.name, ) if not proxy_set: - self.logger.debug(f"Host {self.name}: Proxy in-sync.") + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Check host inventory mode if str(host["inventory_mode"]) == str(self.inventory_mode): - self.logger.debug(f"Host {self.name}: inventory_mode in-sync.") + self.logger.debug("Host %s: inventory_mode in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: inventory_mode OUT of sync.") + self.logger.info("Host %s: inventory_mode OUT of sync.", self.name) self.updateZabbixHost(inventory_mode=str(self.inventory_mode)) if config["inventory_sync"] and self.inventory_mode in [0, 1]: # Check host inventory mapping if host["inventory"] == self.inventory: - self.logger.debug(f"Host {self.name}: Inventory in-sync.") + self.logger.debug("Host %s: Inventory in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: Inventory OUT of sync.") + self.logger.info("Host %s: Inventory OUT of sync.", self.name) self.updateZabbixHost(inventory=self.inventory) # Check host usermacros - if config['usermacro_sync']: + if config["usermacro_sync"]: # Make a full copy synce we dont want to lose the original value # of secret type macros from Netbox netbox_macros = deepcopy(self.usermacros) # Set the sync bit - full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full") + full_sync_bit = bool(str(config["usermacro_sync"]).lower() == "full") for macro in netbox_macros: # If the Macro is a secret and full sync is NOT activated if macro["type"] == str(1) and not full_sync_bit: # Remove the value as the Zabbix api does not return the value key # This is required when you want to do a diff between both lists macro.pop("value") + # Sort all lists def filter_with_macros(macro): return macro["macro"] + host["macros"].sort(key=filter_with_macros) netbox_macros.sort(key=filter_with_macros) # Check if both lists are the same if host["macros"] == netbox_macros: - self.logger.debug(f"Host {self.name}: Usermacros in-sync.") + self.logger.debug("Host %s: Usermacros in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: Usermacros OUT of sync.") + self.logger.info("Host %s: Usermacros OUT of sync.", self.name) # Update Zabbix with NetBox usermacros self.updateZabbixHost(macros=self.usermacros) # Check host tags - if config['tag_sync']: + if config["tag_sync"]: if remove_duplicates(host["tags"], sortkey="tag") == self.tags: - self.logger.debug(f"Host {self.name}: Tags in-sync.") + self.logger.debug("Host %s: Tags in-sync.", self.name) else: - self.logger.info(f"Host {self.name}: Tags OUT of sync.") + self.logger.info("Host %s: Tags OUT of sync.", self.name) self.updateZabbixHost(tags=self.tags) # If only 1 interface has been found @@ -862,7 +878,7 @@ class PhysicalDevice: updates[key] = item if updates: # If interface updates have been found: push to Zabbix - self.logger.info(f"Host {self.name}: Interface OUT of sync.") + self.logger.info("Host %s: Interface OUT of sync.", self.name) if "type" in updates: # Changing interface type not supported. Raise exception. e = ( @@ -876,26 +892,27 @@ class PhysicalDevice: try: # API call to Zabbix self.zabbix.hostinterface.update(updates) - e = (f"Host {self.name}: Updated interface " - f"with data {sanatize_log_output(updates)}.") - self.logger.info(e) - self.create_journal_entry("info", e) + err_msg = ( + f"Host {self.name}: Updated interface " + f"with data {sanatize_log_output(updates)}." + ) + self.logger.info(err_msg) + self.create_journal_entry("info", err_msg) except APIRequestError as e: msg = f"Zabbix returned the following error: {str(e)}." self.logger.error(msg) raise SyncExternalError(msg) from e else: # If no updates are found, Zabbix interface is in-sync - e = f"Host {self.name}: Interface in-sync." - self.logger.debug(e) + self.logger.debug("Host %s: Interface in-sync.", self.name) else: - e = ( + err_msg = ( f"Host {self.name}: Has unsupported interface configuration." f" Host has total of {len(host['interfaces'])} interfaces. " "Manual intervention required." ) - self.logger.error(e) - raise SyncInventoryError(e) + self.logger.error(err_msg) + raise SyncInventoryError(err_msg) def create_journal_entry(self, severity, message): """ @@ -906,7 +923,7 @@ class PhysicalDevice: # Check if the severity is valid if severity not in ["info", "success", "warning", "danger"]: self.logger.warning( - f"Value {severity} not valid for NB journal entries." + "Value %s not valid for NB journal entries.", severity ) return False journal = { @@ -917,12 +934,11 @@ class PhysicalDevice: } try: self.nb_journals.create(journal) - self.logger.debug(f"Host {self.name}: Created journal entry in NetBox") + self.logger.debug("Host %s: Created journal entry in NetBox", self.name) return True except NetboxRequestError as e: self.logger.warning( - "Unable to create journal entry for " - f"{self.name}: NB returned {e}" + "Unable to create journal entry for %s: NB returned {e}", self.name ) return False return False @@ -947,8 +963,9 @@ class PhysicalDevice: tmpls_from_zabbix.pop(pos) succesfull_templates.append(nb_tmpl) self.logger.debug( - f"Host {self.name}: Template " - f"'{nb_tmpl['name']}' is present in Zabbix." + "Host %s: Template '%s' is present in Zabbix.", + self.name, + nb_tmpl["name"], ) break if ( diff --git a/modules/hostgroups.py b/modules/hostgroups.py index 213b4cf..58bb057 100644 --- a/modules/hostgroups.py +++ b/modules/hostgroups.py @@ -94,8 +94,11 @@ class Hostgroup: format_options["cluster"] = self.nb.cluster.name format_options["cluster_type"] = self.nb.cluster.type.name self.format_options = format_options - self.logger.debug(f"Host {self.name}: Resolved properties for use " - f"in hostgroups: {self.format_options}") + self.logger.debug( + "Host %s: Resolved properties for use in hostgroups: %s", + self.name, + self.format_options, + ) def set_nesting( self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions @@ -134,7 +137,9 @@ class Hostgroup: if hostgroup_value: hg_output.append(hostgroup_value) else: - self.logger.info(f"Host {self.name}: Used field '{hg_item}' has no value.") + self.logger.info( + "Host %s: Used field '%s' has no value.", self.name, hg_item + ) # Check if the hostgroup is populated with at least one item. if bool(hg_output): return "/".join(hg_output) diff --git a/modules/tags.py b/modules/tags.py index f341abd..835490c 100644 --- a/modules/tags.py +++ b/modules/tags.py @@ -3,6 +3,7 @@ """ All of the Zabbix Usermacro related configuration """ + from logging import getLogger from modules.tools import field_mapper, remove_duplicates @@ -76,7 +77,7 @@ class ZabbixTags: else: tag["tag"] = tag_name else: - self.logger.warning(f"Tag '{tag_name}' is not a valid tag name, skipping.") + self.logger.warning("Tag '%s' is not a valid tag name, skipping.", tag_name) return False if self.validate_value(tag_value): @@ -86,7 +87,7 @@ class ZabbixTags: tag["value"] = tag_value else: self.logger.info( - f"Tag '{tag_name}' has an invalid value: '{tag_value}', skipping." + "Tag '%s' has an invalid value: '%s', skipping.", tag_name, tag_value ) return False return tag @@ -99,7 +100,7 @@ class ZabbixTags: tags = [] # Parse the field mapper for tags if self.tag_map: - self.logger.debug(f"Host {self.nb.name}: Starting tag mapper.") + self.logger.debug("Host %s: Starting tag mapper.", self.nb.name) field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger) for tag, value in field_tags.items(): t = self.render_tag(tag, value) @@ -131,5 +132,5 @@ class ZabbixTags: tags.append(t) tags = remove_duplicates(tags, sortkey="tag") - self.logger.debug(f"Host {self.name}: Resolved tags: {tags}") + self.logger.debug("Host %s: Resolved tags: %s", self.name, tags) return tags diff --git a/modules/tools.py b/modules/tools.py index c49f5dd..ae7a12b 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -1,6 +1,8 @@ """A collection of tools used by several classes""" + from modules.exceptions import HostgroupError + def convert_recordset(recordset): """Converts netbox RedcordSet to list of dicts.""" recordlist = [] @@ -72,19 +74,22 @@ def field_mapper(host, mapper, nbdevice, logger): elif not value: # empty value should just be an empty string for API compatibility logger.info( - f"Host {host}: NetBox lookup for " - f"'{nb_field}' returned an empty value." + "Host %s: NetBox lookup for '%s' returned an empty value.", + host, + nb_field, ) data[zbx_field] = "" else: # Value is not a string or numeral, probably not what the user expected. logger.info( - f"Host {host}: Lookup for '{nb_field}'" - " returned an unexpected type: it will be skipped." + "Host %s: Lookup for '%s' returned an unexpected type: it will be skipped.", + host, + nb_field, ) logger.debug( - f"Host {host}: Field mapping complete. " - f"Mapped {len(list(filter(None, data.values())))} field(s)." + "Host %s: Field mapping complete. Mapped %s field(s).", + host, + len(list(filter(None, data.values()))), ) return data @@ -101,7 +106,9 @@ def remove_duplicates(input_list, sortkey=None): return output_list -def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None): +def verify_hg_format( + hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None +): """ Verifies hostgroup field format """ @@ -109,44 +116,51 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log device_cfs = [] if not vm_cfs: vm_cfs = [] - allowed_objects = {"dev": ["location", - "rack", - "role", - "manufacturer", - "region", - "site", - "site_group", - "tenant", - "tenant_group", - "platform", - "cluster"] - ,"vm": ["cluster_type", - "role", - "manufacturer", - "region", - "site", - "site_group", - "tenant", - "tenant_group", - "cluster", - "device", - "platform"] - ,"cfs": {"dev": [], "vm": []} - } + allowed_objects = { + "dev": [ + "location", + "rack", + "role", + "manufacturer", + "region", + "site", + "site_group", + "tenant", + "tenant_group", + "platform", + "cluster", + ], + "vm": [ + "cluster_type", + "role", + "manufacturer", + "region", + "site", + "site_group", + "tenant", + "tenant_group", + "cluster", + "device", + "platform", + ], + "cfs": {"dev": [], "vm": []}, + } for cf in device_cfs: - allowed_objects['cfs']['dev'].append(cf.name) + allowed_objects["cfs"]["dev"].append(cf.name) for cf in vm_cfs: - allowed_objects['cfs']['vm'].append(cf.name) + allowed_objects["cfs"]["vm"].append(cf.name) hg_objects = [] - if isinstance(hg_format,list): + if isinstance(hg_format, list): for f in hg_format: hg_objects = hg_objects + f.split("/") else: hg_objects = hg_format.split("/") hg_objects = sorted(set(hg_objects)) for hg_object in hg_objects: - if (hg_object not in allowed_objects[hg_type] and - hg_object not in allowed_objects['cfs'][hg_type]): + if ( + hg_object not in allowed_objects[hg_type] + and hg_object not in allowed_objects["cfs"][hg_type] + ): e = ( f"Hostgroup item {hg_object} is not valid. Make sure you" " use valid items and separate them with '/'." @@ -168,8 +182,7 @@ def sanatize_log_output(data): if "macros" in data: for macro in sanitized_data["macros"]: # Check if macro is secret type - if not (macro["type"] == str(1) or - macro["type"] == 1): + if not (macro["type"] == str(1) or macro["type"] == 1): continue macro["value"] = "********" # Check for interface data diff --git a/modules/usermacros.py b/modules/usermacros.py index cfa8082..acf8725 100644 --- a/modules/usermacros.py +++ b/modules/usermacros.py @@ -3,6 +3,7 @@ """ All of the Zabbix Usermacro related configuration """ + from logging import getLogger from re import match @@ -57,8 +58,11 @@ class ZabbixUsermacros: macro["macro"] = str(macro_name) if isinstance(macro_properties, dict): if not "value" in macro_properties: - self.logger.info(f"Host {self.name}: Usermacro {macro_name} has " - "no value in Netbox, skipping.") + self.logger.info( + "Host %s: Usermacro %s has no value in Netbox, skipping.", + self.name, + macro_name, + ) return False macro["value"] = macro_properties["value"] @@ -83,12 +87,17 @@ class ZabbixUsermacros: macro["description"] = "" else: - self.logger.info(f"Host {self.name}: Usermacro {macro_name} " - "has no value, skipping.") + self.logger.info( + "Host %s: Usermacro %s has no value, skipping.", + self.name, + macro_name, + ) return False else: self.logger.warning( - f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping." + "Host %s: Usermacro %s is not a valid usermacro name, skipping.", + self.name, + macro_name, ) return False return macro @@ -98,10 +107,10 @@ class ZabbixUsermacros: Generate full set of Usermacros """ macros = [] - data={} + data = {} # Parse the field mapper for usermacros if self.usermacro_map: - self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper.") + self.logger.debug("Host %s: Starting usermacro mapper.", self.nb.name) field_macros = field_mapper( self.nb.name, self.usermacro_map, self.nb, self.logger ) @@ -120,6 +129,8 @@ class ZabbixUsermacros: m = self.render_macro(macro, properties) if m: macros.append(m) - data={'macros': macros} - self.logger.debug(f"Host {self.name}: Resolved macros: {sanatize_log_output(data)}") + data = {"macros": macros} + self.logger.debug( + "Host %s: Resolved macros: %s", self.name, sanatize_log_output(data) + ) return macros diff --git a/netbox_zabbix_sync.py b/netbox_zabbix_sync.py index 3e783e8..79fa27e 100755 --- a/netbox_zabbix_sync.py +++ b/netbox_zabbix_sync.py @@ -2,6 +2,7 @@ # pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation """NetBox to Zabbix sync script.""" + import argparse import logging import ssl @@ -67,15 +68,15 @@ def main(arguments): try: # Get NetBox version nb_version = netbox.version - logger.debug(f"NetBox version is {nb_version}.") + logger.debug("NetBox version is %s.", nb_version) except RequestsConnectionError: logger.error( - f"Unable to connect to NetBox with URL {netbox_host}." - " Please check the URL and status of NetBox." + "Unable to connect to NetBox with URL %s. Please check the URL and status of NetBox.", + netbox_host, ) sys.exit(1) except NBRequestError as e: - logger.error(f"NetBox error: {e}") + logger.error("NetBox error: %s", e) sys.exit(1) # Check if the provided Hostgroup layout is valid device_cfs = [] @@ -83,14 +84,18 @@ def main(arguments): device_cfs = list( netbox.extras.custom_fields.filter(type="text", content_types="dcim.device") ) - verify_hg_format(config["hostgroup_format"], - device_cfs=device_cfs, hg_type="dev", logger=logger) + verify_hg_format( + config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger + ) if config["sync_vms"]: vm_cfs = list( - netbox.extras.custom_fields.filter(type="text", - content_types="virtualization.virtualmachine") + netbox.extras.custom_fields.filter( + type="text", content_types="virtualization.virtualmachine" + ) + ) + verify_hg_format( + config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger ) - verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger) # Set Zabbix API try: ssl_ctx = ssl.create_default_context() @@ -120,7 +125,8 @@ def main(arguments): netbox_vms = [] if config["sync_vms"]: netbox_vms = list( - netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])) + netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]) + ) netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all())) netbox_regions = convert_recordset(netbox.dcim.regions.all()) netbox_journals = netbox.extras.journal_entries @@ -141,15 +147,22 @@ def main(arguments): # Go through all NetBox devices for nb_vm in netbox_vms: try: - vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version, - config["create_journal"], logger) - logger.debug(f"Host {vm.name}: Started operations on VM.") + vm = VirtualMachine( + nb_vm, + zabbix, + netbox_journals, + nb_version, + config["create_journal"], + logger, + ) + logger.debug("Host %s: Started operations on VM.", vm.name) vm.set_vm_template() # Check if a valid template has been found for this VM. if not vm.zbx_template_names: continue - vm.set_hostgroup(config["vm_hostgroup_format"], - netbox_site_groups, netbox_regions) + vm.set_hostgroup( + config["vm_hostgroup_format"], netbox_site_groups, netbox_regions + ) # Check if a valid hostgroup has been found for this VM. if not vm.hostgroups: continue @@ -162,13 +175,12 @@ def main(arguments): # Delete device from Zabbix # and remove hostID from NetBox. vm.cleanup() - logger.info(f"VM {vm.name}: cleanup complete") + logger.info("VM %s: cleanup complete", vm.name) continue # Device has been added to NetBox # but is not in Activate state logger.info( - f"VM {vm.name}: Skipping since this VM is " - f"not in the active state." + "VM %s: Skipping since this VM is not in the active state.", vm.name ) continue # Check if the VM is in the disabled state @@ -200,20 +212,31 @@ def main(arguments): for nb_device in netbox_devices: try: # Set device instance set data such as hostgroup and template information. - device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version, - config["create_journal"], logger) - logger.debug(f"Host {device.name}: Started operations on device.") - device.set_template(config["templates_config_context"], - config["templates_config_context_overrule"]) + device = PhysicalDevice( + nb_device, + zabbix, + netbox_journals, + nb_version, + config["create_journal"], + logger, + ) + logger.debug("Host %s: Started operations on device.", device.name) + device.set_template( + config["templates_config_context"], + config["templates_config_context_overrule"], + ) # Check if a valid template has been found for this VM. if not device.zbx_template_names: continue device.set_hostgroup( - config["hostgroup_format"], netbox_site_groups, netbox_regions) + config["hostgroup_format"], netbox_site_groups, netbox_regions + ) # Check if a valid hostgroup has been found for this VM. if not device.hostgroups: - logger.warning(f"Host {device.name}: Host has no valid " - f"hostgroups, Skipping this host...") + logger.warning( + "Host %s: Host has no valid hostgroups, Skipping this host...", + device.name, + ) continue device.set_inventory(nb_device) device.set_usermacros() @@ -223,16 +246,16 @@ def main(arguments): if device.isCluster() and config["clustering"]: # Check if device is primary or secondary if device.promoteMasterDevice(): - e = f"Device {device.name}: is " f"part of cluster and primary." - logger.info(e) + logger.info( + "Device %s: is part of cluster and primary.", device.name + ) else: # Device is secondary in cluster. # Don't continue with this device. - e = ( - f"Device {device.name}: Is part of cluster " - f"but not primary. Skipping this host..." + logger.info( + "Device %s: Is part of cluster but not primary. Skipping this host...", + device.name, ) - logger.info(e) continue # Checks if device is in cleanup state if device.status in config["zabbix_device_removal"]: @@ -240,13 +263,13 @@ def main(arguments): # Delete device from Zabbix # and remove hostID from NetBox. device.cleanup() - logger.info(f"Device {device.name}: cleanup complete") + logger.info("Device %s: cleanup complete", device.name) continue # Device has been added to NetBox # but is not in Activate state logger.info( - f"Device {device.name}: Skipping since this device is " - f"not in the active state." + "Device %s: Skipping since this device is not in the active state.", + device.name, ) continue # Check if the device is in the disabled state