diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index 8dfbe3f..81cad97 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -2,7 +2,6 @@ name: Pylint Quality control on: - push: pull_request: workflow_call: diff --git a/.github/workflows/run_tests.yml b/.github/workflows/run_tests.yml index 6a16a64..589fc47 100644 --- a/.github/workflows/run_tests.yml +++ b/.github/workflows/run_tests.yml @@ -2,7 +2,6 @@ name: Pytest code testing on: - push: pull_request: workflow_call: diff --git a/README.md b/README.md index ef280cc..e6cabfa 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ pip. ```sh # Packages: pynetbox -pyzabbix +zabbix-utils # Install them through requirements.txt from a venv: virtualenv .venv @@ -212,6 +212,17 @@ in `config.py` the script will render a full region path of all parent regions for the hostgroup name. `traverse_site_groups` controls the same behaviour for site_groups. +**Hardcoded text** + +You can add hardcoded text in the hostgroup format by using quotes, this will +insert the literal text: + +```python +hostgroup_format = "'MyDevices'/location/role" +``` + +In this case, the prefix MyDevices will be used for all generated groups. + **Custom fields** You can use the value of custom fields for hostgroup generation. This allows diff --git a/modules/device.py b/modules/device.py index e61cede..cf0e2a2 100644 --- a/modules/device.py +++ b/modules/device.py @@ -26,6 +26,7 @@ from modules.config import load_config config = load_config() + class PhysicalDevice: # pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments """ @@ -48,6 +49,7 @@ class PhysicalDevice: self.zbx_template_names = [] self.zbx_templates = [] self.hostgroups = [] + self.hostgroup_type = "dev" self.tenant = nb.tenant self.config_context = nb.config_context self.zbxproxy = None @@ -96,8 +98,8 @@ class PhysicalDevice: if config["device_cf"] in self.nb.custom_fields: self.zabbix_id = self.nb.custom_fields[config["device_cf"]] else: - e = f'Host {self.name}: Custom field {config["device_cf"]} not present' - self.logger.warning(e) + e = f"Host {self.name}: Custom field {config['device_cf']} not present" + self.logger.error(e) raise SyncInventoryError(e) # Validate hostname format. @@ -110,9 +112,11 @@ class PhysicalDevice: self.visible_name = self.nb.name self.use_visible_name = True self.logger.info( - f"Host {self.visible_name} contains special characters. " - f"Using {self.name} as name for the NetBox object " - f"and using {self.visible_name} as visible name in Zabbix." + "Host %s contains special characters." + "Using %s as name for the NetBox object and using %s as visible name in Zabbix.", + self.visible_name, + self.name, + self.visible_name, ) else: pass @@ -121,12 +125,12 @@ class PhysicalDevice: """Set the hostgroup for this device""" # Create new Hostgroup instance hg = Hostgroup( - "dev", + self.hostgroup_type, self.nb, self.nb_api_version, logger=self.logger, - nested_sitegroup_flag=config['traverse_site_groups'], - nested_region_flag=config['traverse_regions'], + nested_sitegroup_flag=config["traverse_site_groups"], + nested_region_flag=config["traverse_regions"], nb_groups=nb_site_groups, nb_regions=nb_regions, ) @@ -135,6 +139,14 @@ class PhysicalDevice: self.hostgroups = [hg.generate(f) for f in hg_format] else: self.hostgroups.append(hg.generate(hg_format)) + # Remove duplicates and None values + self.hostgroups = list(filter(None, list(set(self.hostgroups)))) + if self.hostgroups: + self.logger.debug( + "Host %s: Should be member of groups: %s", self.name, self.hostgroups + ) + return True + return False def set_template(self, prefer_config_context, overrule_custom): """Set Template""" @@ -177,8 +189,6 @@ class PhysicalDevice: self.logger.warning(e) raise TemplateError(e) - - def get_templates_context(self): """Get Zabbix templates from the device context""" if "zabbix" not in self.config_context: @@ -203,9 +213,11 @@ class PhysicalDevice: # Set inventory mode. Default is disabled (see class init function). if config["inventory_mode"] == "disabled": if config["inventory_sync"]: - self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " - "Inventory sync is enabled in " - "config but inventory mode is disabled.") + self.logger.error( + "Host %s: Unable to map NetBox inventory to Zabbix." + "Inventory sync is enabled in config but inventory mode is disabled", + self.name, + ) return True if config["inventory_mode"] == "manual": self.inventory_mode = 0 @@ -213,16 +225,20 @@ class PhysicalDevice: self.inventory_mode = 1 else: self.logger.error( - f"Host {self.name}: Specified value for inventory mode in" - f" config is not valid. Got value {config['inventory_mode']}" + "Host %s: Specified value for inventory mode in config is not valid. Got value %s", + self.name, + config["inventory_mode"], ) return False self.inventory = {} if config["inventory_sync"] and self.inventory_mode in [0, 1]: - self.logger.debug(f"Host {self.name}: Starting inventory mapper") + self.logger.debug("Host %s: Starting inventory mapper.", self.name) self.inventory = field_mapper( self.name, self._inventory_map(), nbdevice, self.logger ) + self.logger.debug( + "Host %s: Resolved inventory: %s", self.name, self.inventory + ) return True def isCluster(self): @@ -240,14 +256,14 @@ class PhysicalDevice: f"Unable to proces {self.name} for cluster calculation: " f"not part of a cluster." ) - self.logger.warning(e) + self.logger.info(e) raise SyncInventoryError(e) if not self.nb.virtual_chassis.master: e = ( f"{self.name} is part of a NetBox virtual chassis which does " "not have a master configured. Skipping for this reason." ) - self.logger.error(e) + self.logger.warning(e) raise SyncInventoryError(e) return self.nb.virtual_chassis.master.id @@ -259,14 +275,15 @@ class PhysicalDevice: """ masterid = self.getClusterMaster() if masterid == self.id: - self.logger.debug( - f"Host {self.name} is primary cluster member. " - f"Modifying hostname from {self.name} to " - + f"{self.nb.virtual_chassis.name}." + self.logger.info( + "Host %s is primary cluster member. Modifying hostname from %s to %s.", + self.name, + self.name, + self.nb.virtual_chassis.name, ) self.name = self.nb.virtual_chassis.name return True - self.logger.debug(f"Host {self.name} is non-primary cluster member.") + self.logger.info("Host %s is non-primary cluster member.", self.name) return False def zbxTemplatePrepper(self, templates): @@ -278,7 +295,7 @@ class PhysicalDevice: # Check if there are templates defined if not self.zbx_template_names: e = f"Host {self.name}: No templates found" - self.logger.info(e) + self.logger.warning(e) raise SyncInventoryError() # Set variable to empty list self.zbx_templates = [] @@ -298,7 +315,10 @@ class PhysicalDevice: "name": zbx_template["name"], } ) - e = f"Host {self.name}: found template {zbx_template['name']}" + e = ( + f"Host {self.name}: Found template '{zbx_template['name']}' " + f"(ID:{zbx_template['templateid']})" + ) self.logger.debug(e) # Return error should the template not be found in Zabbix if not template_match: @@ -321,8 +341,8 @@ class PhysicalDevice: if group["name"] == hg: self.group_ids.append({"groupid": group["groupid"]}) e = ( - f"Host {self.name}: matched group " - f"\"{group['name']}\" (ID:{group['groupid']})" + f"Host {self.name}: Matched group " + f'"{group["name"]}" (ID:{group["groupid"]})' ) self.logger.debug(e) if len(self.group_ids) == len(self.hostgroups): @@ -403,7 +423,7 @@ class PhysicalDevice: macros = ZabbixUsermacros( self.nb, self._usermacro_map(), - config['usermacro_sync'], + config["usermacro_sync"], logger=self.logger, host=self.name, ) @@ -421,16 +441,16 @@ class PhysicalDevice: tags = ZabbixTags( self.nb, self._tag_map(), - config['tag_sync'], - config['tag_lower'], - tag_name=config['tag_name'], - tag_value=config['tag_value'], + tag_sync=config["tag_sync"], + tag_lower=config["tag_lower"], + tag_name=config["tag_name"], + tag_value=config["tag_value"], logger=self.logger, host=self.name, ) - if tags.sync is False: + if config["tag_sync"] is False: self.tags = [] - + return False self.tags = tags.generate() return True @@ -468,12 +488,12 @@ class PhysicalDevice: # If the proxy name matches if proxy["name"] == proxy_name: self.logger.debug( - f"Host {self.name}: using {proxy['type']}" f" {proxy_name}" + "Host %s: using {proxy['type']} '%s'", self.name, proxy_name ) self.zbxproxy = proxy return True self.logger.warning( - f"Host {self.name}: unable to find proxy {proxy_name}" + "Host %s: unable to find proxy %s", self.name, proxy_name ) return False @@ -503,7 +523,6 @@ class PhysicalDevice: templateids.append({"templateid": template["templateid"]}) # Set interface, group and template configuration interfaces = self.setInterfaceDetails() - groups = self.group_ids # Set Zabbix proxy if defined self.setProxy(proxies) # Set basic data for host creation @@ -512,7 +531,7 @@ class PhysicalDevice: "name": self.visible_name, "status": self.zabbix_state, "interfaces": interfaces, - "groups": groups, + "groups": self.group_ids, "templates": templateids, "description": description, "inventory_mode": self.inventory_mode, @@ -541,12 +560,12 @@ class PhysicalDevice: # Set NetBox custom field to hostID value. self.nb.custom_fields[config["device_cf"]] = int(self.zabbix_id) self.nb.save() - msg = f"Host {self.name}: Created host in Zabbix." + msg = f"Host {self.name}: Created host in Zabbix. (ID:{self.zabbix_id})" self.logger.info(msg) self.create_journal_entry("success", msg) else: self.logger.error( - f"Host {self.name}: Unable to add to Zabbix. Host already present." + "Host %s: Unable to add to Zabbix. Host already present.", self.name ) def createZabbixHostgroup(self, hostgroups): @@ -604,7 +623,9 @@ class PhysicalDevice: ) self.logger.error(e) raise SyncExternalError(e) from None - self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.") + self.logger.info( + "Host %s: updated with data %s.", self.name, sanatize_log_output(kwargs) + ) self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") def ConsistencyCheck( @@ -615,7 +636,7 @@ class PhysicalDevice: Checks if Zabbix object is still valid with NetBox parameters. """ # If group is found or if the hostgroup is nested - if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1: + if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1: if create_hostgroups: # Script is allowed to create a new hostgroup new_groups = self.createZabbixHostgroup(groups) @@ -632,8 +653,6 @@ class PhysicalDevice: ) self.logger.warning(e) raise SyncInventoryError(e) - #if self.group_ids: - # self.group_ids.append(self.pri_group_id) # Prepare templates and proxy config self.zbxTemplatePrepper(templates) @@ -666,28 +685,30 @@ class PhysicalDevice: raise SyncInventoryError(e) host = host[0] if host["host"] == self.name: - self.logger.debug(f"Host {self.name}: hostname in-sync.") + self.logger.debug("Host %s: Hostname in-sync.", self.name) else: - self.logger.warning( - f"Host {self.name}: hostname OUT of sync. " - f"Received value: {host['host']}" + self.logger.info( + "Host %s: Hostname OUT of sync. Received value: %s", + self.name, + host["host"], ) self.updateZabbixHost(host=self.name) # Execute check depending on wether the name is special or not if self.use_visible_name: if host["name"] == self.visible_name: - self.logger.debug(f"Host {self.name}: visible name in-sync.") + self.logger.debug("Host %s: Visible name in-sync.", self.name) else: - self.logger.warning( - f"Host {self.name}: visible name OUT of sync." - f" Received value: {host['name']}" + self.logger.info( + "Host %s: Visible name OUT of sync. Received value: %s", + self.name, + host["name"], ) self.updateZabbixHost(name=self.visible_name) # Check if the templates are in-sync if not self.zbx_template_comparer(host["parentTemplates"]): - self.logger.warning(f"Host {self.name}: template(s) OUT of sync.") + self.logger.info("Host %s: Template(s) OUT of sync.", self.name) # Prepare Templates for API parsing templateids = [] for template in self.zbx_templates: @@ -697,38 +718,41 @@ class PhysicalDevice: templates_clear=host["parentTemplates"], templates=templateids ) else: - self.logger.debug(f"Host {self.name}: template(s) in-sync.") + self.logger.debug("Host %s: Template(s) in-sync.", self.name) # Check if Zabbix version is 6 or higher. Issue #93 group_dictname = "hostgroups" if str(self.zabbix.version).startswith(("6", "5")): group_dictname = "groups" # Check if hostgroups match - if (sorted(host[group_dictname], key=itemgetter('groupid')) == - sorted(self.group_ids, key=itemgetter('groupid'))): - self.logger.debug(f"Host {self.name}: hostgroups in-sync.") + if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted( + self.group_ids, key=itemgetter("groupid") + ): + self.logger.debug("Host %s: Hostgroups in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: hostgroups OUT of sync.") + self.logger.info("Host %s: Hostgroups OUT of sync.", self.name) self.updateZabbixHost(groups=self.group_ids) if int(host["status"]) == self.zabbix_state: - self.logger.debug(f"Host {self.name}: status in-sync.") + self.logger.debug("Host %s: Status in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: status OUT of sync.") + self.logger.info("Host %s: Status OUT of sync.", self.name) self.updateZabbixHost(status=str(self.zabbix_state)) # Check if a proxy has been defined if self.zbxproxy: # Check if proxy or proxy group is defined - if (self.zbxproxy["idtype"] in host and - host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]): - self.logger.debug(f"Host {self.name}: proxy in-sync.") + if ( + self.zbxproxy["idtype"] in host + and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"] + ): + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Backwards compatibility for Zabbix <= 6 elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]: - self.logger.debug(f"Host {self.name}: proxy in-sync.") + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Proxy does not match, update Zabbix else: - self.logger.warning(f"Host {self.name}: proxy OUT of sync.") + self.logger.info("Host %s: Proxy OUT of sync.", self.name) # Zabbix <= 6 patch if not str(self.zabbix.version).startswith("7"): self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"]) @@ -751,8 +775,9 @@ class PhysicalDevice: if proxy_power and proxy_set: # Zabbix <= 6 fix self.logger.warning( - f"Host {self.name}: no proxy is configured in NetBox " - "but is configured in Zabbix. Removing proxy config in Zabbix" + "Host %s: No proxy is configured in NetBox but is configured in Zabbix." + "Removing proxy config in Zabbix", + self.name, ) if "proxy_hostid" in host and bool(host["proxy_hostid"]): self.updateZabbixHost(proxy_hostid=0) @@ -765,60 +790,61 @@ class PhysicalDevice: # Checks if a proxy has been defined in Zabbix and if proxy_power config has been set if proxy_set and not proxy_power: # Display error message - self.logger.error( - f"Host {self.name} is configured " - f"with proxy in Zabbix but not in NetBox. The" - " -p flag was ommited: no " - "changes have been made." + self.logger.warning( + "Host %s: Is configured with proxy in Zabbix but not in NetBox." + "The -p flag was ommited: no changes have been made.", + self.name, ) if not proxy_set: - self.logger.debug(f"Host {self.name}: proxy in-sync.") + self.logger.debug("Host %s: Proxy in-sync.", self.name) # Check host inventory mode if str(host["inventory_mode"]) == str(self.inventory_mode): - self.logger.debug(f"Host {self.name}: inventory_mode in-sync.") + self.logger.debug("Host %s: inventory_mode in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.") + self.logger.info("Host %s: inventory_mode OUT of sync.", self.name) self.updateZabbixHost(inventory_mode=str(self.inventory_mode)) if config["inventory_sync"] and self.inventory_mode in [0, 1]: # Check host inventory mapping if host["inventory"] == self.inventory: - self.logger.debug(f"Host {self.name}: inventory in-sync.") + self.logger.debug("Host %s: Inventory in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: inventory OUT of sync.") + self.logger.info("Host %s: Inventory OUT of sync.", self.name) self.updateZabbixHost(inventory=self.inventory) # Check host usermacros - if config['usermacro_sync']: + if config["usermacro_sync"]: # Make a full copy synce we dont want to lose the original value # of secret type macros from Netbox netbox_macros = deepcopy(self.usermacros) # Set the sync bit - full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full") + full_sync_bit = bool(str(config["usermacro_sync"]).lower() == "full") for macro in netbox_macros: # If the Macro is a secret and full sync is NOT activated if macro["type"] == str(1) and not full_sync_bit: # Remove the value as the Zabbix api does not return the value key # This is required when you want to do a diff between both lists macro.pop("value") + # Sort all lists def filter_with_macros(macro): return macro["macro"] + host["macros"].sort(key=filter_with_macros) netbox_macros.sort(key=filter_with_macros) # Check if both lists are the same if host["macros"] == netbox_macros: - self.logger.debug(f"Host {self.name}: usermacros in-sync.") + self.logger.debug("Host %s: Usermacros in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: usermacros OUT of sync.") + self.logger.info("Host %s: Usermacros OUT of sync.", self.name) # Update Zabbix with NetBox usermacros self.updateZabbixHost(macros=self.usermacros) # Check host tags - if config['tag_sync']: + if config["tag_sync"]: if remove_duplicates(host["tags"], sortkey="tag") == self.tags: - self.logger.debug(f"Host {self.name}: tags in-sync.") + self.logger.debug("Host %s: Tags in-sync.", self.name) else: - self.logger.warning(f"Host {self.name}: tags OUT of sync.") + self.logger.info("Host %s: Tags OUT of sync.", self.name) self.updateZabbixHost(tags=self.tags) # If only 1 interface has been found @@ -856,11 +882,11 @@ class PhysicalDevice: updates[key] = item if updates: # If interface updates have been found: push to Zabbix - self.logger.warning(f"Host {self.name}: Interface OUT of sync.") + self.logger.info("Host %s: Interface OUT of sync.", self.name) if "type" in updates: # Changing interface type not supported. Raise exception. e = ( - f"Host {self.name}: changing interface type to " + f"Host {self.name}: Changing interface type to " f"{str(updates['type'])} is not supported." ) self.logger.error(e) @@ -870,26 +896,27 @@ class PhysicalDevice: try: # API call to Zabbix self.zabbix.hostinterface.update(updates) - e = (f"Host {self.name}: updated interface " - f"with data {sanatize_log_output(updates)}.") - self.logger.info(e) - self.create_journal_entry("info", e) + err_msg = ( + f"Host {self.name}: Updated interface " + f"with data {sanatize_log_output(updates)}." + ) + self.logger.info(err_msg) + self.create_journal_entry("info", err_msg) except APIRequestError as e: msg = f"Zabbix returned the following error: {str(e)}." self.logger.error(msg) raise SyncExternalError(msg) from e else: # If no updates are found, Zabbix interface is in-sync - e = f"Host {self.name}: interface in-sync." - self.logger.debug(e) + self.logger.debug("Host %s: Interface in-sync.", self.name) else: - e = ( - f"Host {self.name} has unsupported interface configuration." + err_msg = ( + f"Host {self.name}: Has unsupported interface configuration." f" Host has total of {len(host['interfaces'])} interfaces. " "Manual intervention required." ) - self.logger.error(e) - raise SyncInventoryError(e) + self.logger.error(err_msg) + raise SyncInventoryError(err_msg) def create_journal_entry(self, severity, message): """ @@ -900,7 +927,7 @@ class PhysicalDevice: # Check if the severity is valid if severity not in ["info", "success", "warning", "danger"]: self.logger.warning( - f"Value {severity} not valid for NB journal entries." + "Value %s not valid for NB journal entries.", severity ) return False journal = { @@ -911,12 +938,13 @@ class PhysicalDevice: } try: self.nb_journals.create(journal) - self.logger.debug(f"Host {self.name}: Created journal entry in NetBox") + self.logger.debug("Host %s: Created journal entry in NetBox", self.name) return True except NetboxRequestError as e: self.logger.warning( - "Unable to create journal entry for " - f"{self.name}: NB returned {e}" + "Unable to create journal entry for %s: NB returned %s", + self.name, + e, ) return False return False @@ -941,8 +969,9 @@ class PhysicalDevice: tmpls_from_zabbix.pop(pos) succesfull_templates.append(nb_tmpl) self.logger.debug( - f"Host {self.name}: template " - f"{nb_tmpl['name']} is present in Zabbix." + "Host %s: Template '%s' is present in Zabbix.", + self.name, + nb_tmpl["name"], ) break if ( diff --git a/modules/hostgroups.py b/modules/hostgroups.py index 8bbec86..5916c0a 100644 --- a/modules/hostgroups.py +++ b/modules/hostgroups.py @@ -11,6 +11,7 @@ class Hostgroup: Takes type (vm or dev) and NB object""" # pylint: disable=too-many-arguments, disable=too-many-positional-arguments + # pylint: disable=logging-fstring-interpolation def __init__( self, obj_type, @@ -93,6 +94,11 @@ class Hostgroup: format_options["cluster"] = self.nb.cluster.name format_options["cluster_type"] = self.nb.cluster.type.name self.format_options = format_options + self.logger.debug( + "Host %s: Resolved properties for use in hostgroups: %s", + self.name, + self.format_options, + ) def set_nesting( self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions @@ -103,49 +109,51 @@ class Hostgroup: "region": {"flag": nested_region_flag, "data": nb_regions}, } - def generate(self, hg_format=None): + def generate(self, hg_format): """Generate hostgroup based on a provided format""" - # Set format to default in case its not specified - if not hg_format: - hg_format = ( - "site/manufacturer/role" if self.type == "dev" else "cluster/role" - ) # Split all given names hg_output = [] hg_items = hg_format.split("/") for hg_item in hg_items: # Check if requested data is available as option for this host if hg_item not in self.format_options: - # Check if a custom field exists with this name - cf_data = self.custom_field_lookup(hg_item) - # CF does not exist - if not cf_data["result"]: - msg = ( - f"Unable to generate hostgroup for host {self.name}. " - f"Item type {hg_item} not supported." - ) - self.logger.error(msg) - raise HostgroupError(msg) - # CF data is populated - if cf_data["cf"]: - hg_output.append(cf_data["cf"]) + if hg_item.startswith(("'", '"')) and hg_item.endswith(("'", '"')): + hg_item = hg_item.strip("\'") + hg_item = hg_item.strip('\"') + hg_output.append(hg_item) + else: + # Check if a custom field exists with this name + cf_data = self.custom_field_lookup(hg_item) + # CF does not exist + if not cf_data["result"]: + msg = ( + f"Unable to generate hostgroup for host {self.name}. " + f"Item type {hg_item} not supported." + ) + self.logger.error(msg) + raise HostgroupError(msg) + # CF data is populated + if cf_data["cf"]: + hg_output.append(cf_data["cf"]) continue # Check if there is a value associated to the variable. # For instance, if a device has no location, do not use it with hostgroup calculation hostgroup_value = self.format_options[hg_item] if hostgroup_value: hg_output.append(hostgroup_value) + else: + self.logger.info( + "Host %s: Used field '%s' has no value.", self.name, hg_item + ) # Check if the hostgroup is populated with at least one item. if bool(hg_output): return "/".join(hg_output) msg = ( - f"Unable to generate hostgroup for host {self.name}." - " Not enough valid items. This is most likely" - " due to the use of custom fields that are empty" - " or an invalid hostgroup format." + f"Host {self.name}: Generating hostgroup name for '{hg_format}' failed. " + f"This is most likely due to fields that have no value." ) - self.logger.error(msg) - raise HostgroupError(msg) + self.logger.warning(msg) + return None def list_formatoptions(self): """ diff --git a/modules/tags.py b/modules/tags.py index 441ebe2..835490c 100644 --- a/modules/tags.py +++ b/modules/tags.py @@ -3,6 +3,7 @@ """ All of the Zabbix Usermacro related configuration """ + from logging import getLogger from modules.tools import field_mapper, remove_duplicates @@ -15,7 +16,7 @@ class ZabbixTags: self, nb, tag_map, - tag_sync, + tag_sync=False, tag_lower=True, tag_name=None, tag_value=None, @@ -76,7 +77,7 @@ class ZabbixTags: else: tag["tag"] = tag_name else: - self.logger.warning(f"Tag {tag_name} is not a valid tag name, skipping.") + self.logger.warning("Tag '%s' is not a valid tag name, skipping.", tag_name) return False if self.validate_value(tag_value): @@ -85,8 +86,8 @@ class ZabbixTags: else: tag["value"] = tag_value else: - self.logger.warning( - f"Tag {tag_name} has an invalid value: '{tag_value}', skipping." + self.logger.info( + "Tag '%s' has an invalid value: '%s', skipping.", tag_name, tag_value ) return False return tag @@ -99,7 +100,7 @@ class ZabbixTags: tags = [] # Parse the field mapper for tags if self.tag_map: - self.logger.debug(f"Host {self.nb.name}: Starting tag mapper") + self.logger.debug("Host %s: Starting tag mapper.", self.nb.name) field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger) for tag, value in field_tags.items(): t = self.render_tag(tag, value) @@ -130,4 +131,6 @@ class ZabbixTags: if t: tags.append(t) - return remove_duplicates(tags, sortkey="tag") + tags = remove_duplicates(tags, sortkey="tag") + self.logger.debug("Host %s: Resolved tags: %s", self.name, tags) + return tags diff --git a/modules/tools.py b/modules/tools.py index 823410e..dacab20 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -1,6 +1,8 @@ """A collection of tools used by several classes""" + from modules.exceptions import HostgroupError + def convert_recordset(recordset): """Converts netbox RedcordSet to list of dicts.""" recordlist = [] @@ -71,20 +73,23 @@ def field_mapper(host, mapper, nbdevice, logger): data[zbx_field] = str(value) elif not value: # empty value should just be an empty string for API compatibility - logger.debug( - f"Host {host}: NetBox lookup for " - f"'{nb_field}' returned an empty value" + logger.info( + "Host %s: NetBox lookup for '%s' returned an empty value.", + host, + nb_field, ) data[zbx_field] = "" else: # Value is not a string or numeral, probably not what the user expected. - logger.error( - f"Host {host}: Lookup for '{nb_field}'" - " returned an unexpected type: it will be skipped." + logger.info( + "Host %s: Lookup for '%s' returned an unexpected type: it will be skipped.", + host, + nb_field, ) logger.debug( - f"Host {host}: Field mapping complete. " - f"Mapped {len(list(filter(None, data.values())))} field(s)" + "Host %s: Field mapping complete. Mapped %s field(s).", + host, + len(list(filter(None, data.values()))), ) return data @@ -101,7 +106,9 @@ def remove_duplicates(input_list, sortkey=None): return output_list -def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None): +def verify_hg_format( + hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None +): """ Verifies hostgroup field format """ @@ -109,49 +116,57 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log device_cfs = [] if not vm_cfs: vm_cfs = [] - allowed_objects = {"dev": ["location", - "rack", - "role", - "manufacturer", - "region", - "site", - "site_group", - "tenant", - "tenant_group", - "platform", - "cluster"] - ,"vm": ["cluster_type", - "role", - "manufacturer", - "region", - "site", - "site_group", - "tenant", - "tenant_group", - "cluster", - "device", - "platform"] - ,"cfs": {"dev": [], "vm": []} - } + allowed_objects = { + "dev": [ + "location", + "rack", + "role", + "manufacturer", + "region", + "site", + "site_group", + "tenant", + "tenant_group", + "platform", + "cluster", + ], + "vm": [ + "cluster_type", + "role", + "manufacturer", + "region", + "site", + "site_group", + "tenant", + "tenant_group", + "cluster", + "device", + "platform", + ], + "cfs": {"dev": [], "vm": []}, + } for cf in device_cfs: - allowed_objects['cfs']['dev'].append(cf.name) + allowed_objects["cfs"]["dev"].append(cf.name) for cf in vm_cfs: - allowed_objects['cfs']['vm'].append(cf.name) + allowed_objects["cfs"]["vm"].append(cf.name) hg_objects = [] - if isinstance(hg_format,list): + if isinstance(hg_format, list): for f in hg_format: hg_objects = hg_objects + f.split("/") else: hg_objects = hg_format.split("/") hg_objects = sorted(set(hg_objects)) for hg_object in hg_objects: - if (hg_object not in allowed_objects[hg_type] and - hg_object not in allowed_objects['cfs'][hg_type]): + if ( + hg_object not in allowed_objects[hg_type] + and hg_object not in allowed_objects["cfs"][hg_type] + and not hg_object.startswith(('"',"'")) + ): e = ( f"Hostgroup item {hg_object} is not valid. Make sure you" " use valid items and separate them with '/'." ) - logger.error(e) + logger.warning(e) raise HostgroupError(e) @@ -159,7 +174,7 @@ def sanatize_log_output(data): """ Used for the update function to Zabbix which shows the data that its using to update the host. - Removes and sensitive data from the input. + Removes any sensitive data from the input. """ if not isinstance(data, dict): return data @@ -168,7 +183,7 @@ def sanatize_log_output(data): if "macros" in data: for macro in sanitized_data["macros"]: # Check if macro is secret type - if not macro["type"] == str(1): + if not (macro["type"] == str(1) or macro["type"] == 1): continue macro["value"] = "********" # Check for interface data diff --git a/modules/usermacros.py b/modules/usermacros.py index 6d396c8..acf8725 100644 --- a/modules/usermacros.py +++ b/modules/usermacros.py @@ -3,10 +3,11 @@ """ All of the Zabbix Usermacro related configuration """ + from logging import getLogger from re import match -from modules.tools import field_mapper +from modules.tools import field_mapper, sanatize_log_output class ZabbixUsermacros: @@ -57,8 +58,11 @@ class ZabbixUsermacros: macro["macro"] = str(macro_name) if isinstance(macro_properties, dict): if not "value" in macro_properties: - self.logger.warning(f"Host {self.name}: Usermacro {macro_name} has " - "no value in Netbox, skipping.") + self.logger.info( + "Host %s: Usermacro %s has no value in Netbox, skipping.", + self.name, + macro_name, + ) return False macro["value"] = macro_properties["value"] @@ -83,12 +87,17 @@ class ZabbixUsermacros: macro["description"] = "" else: - self.logger.warning(f"Host {self.name}: Usermacro {macro_name} " - "has no value, skipping.") + self.logger.info( + "Host %s: Usermacro %s has no value, skipping.", + self.name, + macro_name, + ) return False else: - self.logger.error( - f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping." + self.logger.warning( + "Host %s: Usermacro %s is not a valid usermacro name, skipping.", + self.name, + macro_name, ) return False return macro @@ -98,9 +107,10 @@ class ZabbixUsermacros: Generate full set of Usermacros """ macros = [] + data = {} # Parse the field mapper for usermacros if self.usermacro_map: - self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper") + self.logger.debug("Host %s: Starting usermacro mapper.", self.nb.name) field_macros = field_mapper( self.nb.name, self.usermacro_map, self.nb, self.logger ) @@ -119,4 +129,8 @@ class ZabbixUsermacros: m = self.render_macro(macro, properties) if m: macros.append(m) + data = {"macros": macros} + self.logger.debug( + "Host %s: Resolved macros: %s", self.name, sanatize_log_output(data) + ) return macros diff --git a/modules/virtual_machine.py b/modules/virtual_machine.py index e0f7abb..8c52033 100644 --- a/modules/virtual_machine.py +++ b/modules/virtual_machine.py @@ -2,7 +2,6 @@ """Module that hosts all functions for virtual machine processing""" from modules.device import PhysicalDevice from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError -from modules.hostgroups import Hostgroup from modules.interface import ZabbixInterface from modules.config import load_config # Load config @@ -16,6 +15,7 @@ class VirtualMachine(PhysicalDevice): super().__init__(*args, **kwargs) self.hostgroup = None self.zbx_template_names = None + self.hostgroup_type = "vm" def _inventory_map(self): """use VM inventory maps""" @@ -29,25 +29,6 @@ class VirtualMachine(PhysicalDevice): """use VM tag maps""" return config["vm_tag_map"] - def set_hostgroup(self, hg_format, nb_site_groups, nb_regions): - """Set the hostgroup for this device""" - # Create new Hostgroup instance - hg = Hostgroup( - "vm", - self.nb, - self.nb_api_version, - logger=self.logger, - nested_sitegroup_flag=config["traverse_site_groups"], - nested_region_flag=config["traverse_regions"], - nb_groups=nb_site_groups, - nb_regions=nb_regions, - ) - # Generate hostgroup based on hostgroup format - if isinstance(hg_format, list): - self.hostgroups = [hg.generate(f) for f in hg_format] - else: - self.hostgroups.append(hg.generate(hg_format)) - def set_vm_template(self): """Set Template for VMs. Overwrites default class to skip a lookup of custom fields.""" diff --git a/netbox_zabbix_sync.py b/netbox_zabbix_sync.py index d9ff71b..79fa27e 100755 --- a/netbox_zabbix_sync.py +++ b/netbox_zabbix_sync.py @@ -2,6 +2,7 @@ # pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation """NetBox to Zabbix sync script.""" + import argparse import logging import ssl @@ -67,15 +68,15 @@ def main(arguments): try: # Get NetBox version nb_version = netbox.version - logger.debug(f"NetBox version is {nb_version}.") + logger.debug("NetBox version is %s.", nb_version) except RequestsConnectionError: logger.error( - f"Unable to connect to NetBox with URL {netbox_host}." - " Please check the URL and status of NetBox." + "Unable to connect to NetBox with URL %s. Please check the URL and status of NetBox.", + netbox_host, ) sys.exit(1) except NBRequestError as e: - logger.error(f"NetBox error: {e}") + logger.error("NetBox error: %s", e) sys.exit(1) # Check if the provided Hostgroup layout is valid device_cfs = [] @@ -83,14 +84,18 @@ def main(arguments): device_cfs = list( netbox.extras.custom_fields.filter(type="text", content_types="dcim.device") ) - verify_hg_format(config["hostgroup_format"], - device_cfs=device_cfs, hg_type="dev", logger=logger) + verify_hg_format( + config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger + ) if config["sync_vms"]: vm_cfs = list( - netbox.extras.custom_fields.filter(type="text", - content_types="virtualization.virtualmachine") + netbox.extras.custom_fields.filter( + type="text", content_types="virtualization.virtualmachine" + ) + ) + verify_hg_format( + config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger ) - verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger) # Set Zabbix API try: ssl_ctx = ssl.create_default_context() @@ -120,7 +125,8 @@ def main(arguments): netbox_vms = [] if config["sync_vms"]: netbox_vms = list( - netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])) + netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]) + ) netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all())) netbox_regions = convert_recordset(netbox.dcim.regions.all()) netbox_journals = netbox.extras.journal_entries @@ -141,15 +147,22 @@ def main(arguments): # Go through all NetBox devices for nb_vm in netbox_vms: try: - vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version, - config["create_journal"], logger) - logger.debug(f"Host {vm.name}: started operations on VM.") + vm = VirtualMachine( + nb_vm, + zabbix, + netbox_journals, + nb_version, + config["create_journal"], + logger, + ) + logger.debug("Host %s: Started operations on VM.", vm.name) vm.set_vm_template() # Check if a valid template has been found for this VM. if not vm.zbx_template_names: continue - vm.set_hostgroup(config["vm_hostgroup_format"], - netbox_site_groups, netbox_regions) + vm.set_hostgroup( + config["vm_hostgroup_format"], netbox_site_groups, netbox_regions + ) # Check if a valid hostgroup has been found for this VM. if not vm.hostgroups: continue @@ -162,13 +175,12 @@ def main(arguments): # Delete device from Zabbix # and remove hostID from NetBox. vm.cleanup() - logger.debug(f"VM {vm.name}: cleanup complete") + logger.info("VM %s: cleanup complete", vm.name) continue # Device has been added to NetBox # but is not in Activate state logger.info( - f"VM {vm.name}: skipping since this VM is " - f"not in the active state." + "VM %s: Skipping since this VM is not in the active state.", vm.name ) continue # Check if the VM is in the disabled state @@ -200,18 +212,31 @@ def main(arguments): for nb_device in netbox_devices: try: # Set device instance set data such as hostgroup and template information. - device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version, - config["create_journal"], logger) - logger.debug(f"Host {device.name}: started operations on device.") - device.set_template(config["templates_config_context"], - config["templates_config_context_overrule"]) + device = PhysicalDevice( + nb_device, + zabbix, + netbox_journals, + nb_version, + config["create_journal"], + logger, + ) + logger.debug("Host %s: Started operations on device.", device.name) + device.set_template( + config["templates_config_context"], + config["templates_config_context_overrule"], + ) # Check if a valid template has been found for this VM. if not device.zbx_template_names: continue device.set_hostgroup( - config["hostgroup_format"], netbox_site_groups, netbox_regions) + config["hostgroup_format"], netbox_site_groups, netbox_regions + ) # Check if a valid hostgroup has been found for this VM. if not device.hostgroups: + logger.warning( + "Host %s: Host has no valid hostgroups, Skipping this host...", + device.name, + ) continue device.set_inventory(nb_device) device.set_usermacros() @@ -221,16 +246,16 @@ def main(arguments): if device.isCluster() and config["clustering"]: # Check if device is primary or secondary if device.promoteMasterDevice(): - e = f"Device {device.name}: is " f"part of cluster and primary." - logger.info(e) + logger.info( + "Device %s: is part of cluster and primary.", device.name + ) else: # Device is secondary in cluster. # Don't continue with this device. - e = ( - f"Device {device.name}: is part of cluster " - f"but not primary. Skipping this host..." + logger.info( + "Device %s: Is part of cluster but not primary. Skipping this host...", + device.name, ) - logger.info(e) continue # Checks if device is in cleanup state if device.status in config["zabbix_device_removal"]: @@ -238,13 +263,13 @@ def main(arguments): # Delete device from Zabbix # and remove hostID from NetBox. device.cleanup() - logger.info(f"Device {device.name}: cleanup complete") + logger.info("Device %s: cleanup complete", device.name) continue # Device has been added to NetBox # but is not in Activate state logger.info( - f"Device {device.name}: skipping since this device is " - f"not in the active state." + "Device %s: Skipping since this device is not in the active state.", + device.name, ) continue # Check if the device is in the disabled state @@ -280,7 +305,7 @@ if __name__ == "__main__": description="A script to sync Zabbix with NetBox device data." ) parser.add_argument( - "-v", "--verbose", help="Turn on debugging.", action="store_true" + "-v", "--verbose", help="Turn on verbose logging.", action="store_true" ) parser.add_argument( "-vv", "--debug", help="Turn on debugging.", action="store_true" diff --git a/requirements.txt b/requirements.txt index 295b59f..1a3470f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ pynetbox==7.4.1 -zabbix-utils==2.0.2 +zabbix-utils==2.0.3 diff --git a/tests/test_hostgroups.py b/tests/test_hostgroups.py index 1e652ec..995d26c 100644 --- a/tests/test_hostgroups.py +++ b/tests/test_hostgroups.py @@ -163,10 +163,6 @@ class TestHostgroups(unittest.TestCase): """Test different hostgroup formats for devices.""" hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger) - # Default format: site/manufacturer/role - default_result = hostgroup.generate() - self.assertEqual(default_result, "TestSite/TestManufacturer/TestRole") - # Custom format: site/region custom_result = hostgroup.generate("site/region") self.assertEqual(custom_result, "TestSite/TestRegion") @@ -180,7 +176,7 @@ class TestHostgroups(unittest.TestCase): hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger) # Default format: cluster/role - default_result = hostgroup.generate() + default_result = hostgroup.generate("cluster/role") self.assertEqual(default_result, "TestCluster/TestRole") # Custom format: site/tenant @@ -251,7 +247,7 @@ class TestHostgroups(unittest.TestCase): hostgroup = Hostgroup("dev", minimal_device, "4.0", self.mock_logger) # Generate with default format - result = hostgroup.generate() + result = hostgroup.generate("site/manufacturer/role") # Site is missing, so only manufacturer and role should be included self.assertEqual(result, "MinimalManufacturer/MinimalRole") @@ -259,24 +255,6 @@ class TestHostgroups(unittest.TestCase): with self.assertRaises(HostgroupError): hostgroup.generate("site/nonexistent/role") - def test_hostgroup_missing_required_attributes(self): - """Test handling when no valid hostgroup can be generated.""" - # Create a VM with minimal attributes that won't satisfy any format - minimal_vm = MagicMock() - minimal_vm.name = "minimal-vm" - minimal_vm.site = None - minimal_vm.tenant = None - minimal_vm.platform = None - minimal_vm.role = None - minimal_vm.cluster = None - minimal_vm.custom_fields = {} - - hostgroup = Hostgroup("vm", minimal_vm, "4.0", self.mock_logger) - - # With default format of cluster/role, both are None, so should raise an error - with self.assertRaises(HostgroupError): - hostgroup.generate() - def test_nested_region_hostgroups(self): """Test hostgroup generation with nested regions.""" # Mock the build_path function to return a predictable result @@ -334,6 +312,60 @@ class TestHostgroups(unittest.TestCase): calls = [call.write(f"The following options are available for host test-device"), call.write('\n')] mock_stdout.assert_has_calls(calls, any_order=True) + + def test_vm_list_based_hostgroup_format(self): + """Test VM hostgroup generation with a list-based format.""" + hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger) + + # Test with a list of format strings + format_list = ["platform", "role", "cluster_type/cluster"] + + # Generate hostgroups for each format in the list + hostgroups = [] + for fmt in format_list: + result = hostgroup.generate(fmt) + if result: # Only add non-None results + hostgroups.append(result) + + # Verify each expected hostgroup is generated + self.assertEqual(len(hostgroups), 3) # Should have 3 hostgroups + self.assertIn("TestPlatform", hostgroups) + self.assertIn("TestRole", hostgroups) + self.assertIn("TestClusterType/TestCluster", hostgroups) + + def test_nested_format_splitting(self): + """Test that formats with slashes correctly split and resolve each component.""" + hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger) + + # Test a format with slashes that should be split + complex_format = "cluster_type/cluster" + result = hostgroup.generate(complex_format) + + # Verify the format is correctly split and each component resolved + self.assertEqual(result, "TestClusterType/TestCluster") + + def test_multiple_hostgroup_formats_device(self): + """Test device hostgroup generation with multiple formats.""" + hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger) + + # Test with various formats that would be in a list + formats = [ + "site", + "manufacturer/role", + "platform/location", + "tenant_group/tenant" + ] + + # Generate and check each format + results = {} + for fmt in formats: + results[fmt] = hostgroup.generate(fmt) + + # Verify results + self.assertEqual(results["site"], "TestSite") + self.assertEqual(results["manufacturer/role"], "TestManufacturer/TestRole") + self.assertEqual(results["platform/location"], "TestPlatform/TestLocation") + self.assertEqual(results["tenant_group/tenant"], "TestTenantGroup/TestTenant") if __name__ == "__main__": diff --git a/tests/test_list_hostgroup_formats.py b/tests/test_list_hostgroup_formats.py new file mode 100644 index 0000000..9b8cc21 --- /dev/null +++ b/tests/test_list_hostgroup_formats.py @@ -0,0 +1,137 @@ +"""Tests for list-based hostgroup formats in configuration.""" +import unittest +from unittest.mock import MagicMock, patch +from modules.hostgroups import Hostgroup +from modules.exceptions import HostgroupError +from modules.tools import verify_hg_format + + +class TestListHostgroupFormats(unittest.TestCase): + """Test class for list-based hostgroup format functionality.""" + + def setUp(self): + """Set up test fixtures.""" + # Create mock logger + self.mock_logger = MagicMock() + + # Create mock device + self.mock_device = MagicMock() + self.mock_device.name = "test-device" + + # Set up site information + site = MagicMock() + site.name = "TestSite" + + # Set up region information + region = MagicMock() + region.name = "TestRegion" + region.__str__.return_value = "TestRegion" + site.region = region + + # Set device site + self.mock_device.site = site + + # Set up role information + self.mock_device_role = MagicMock() + self.mock_device_role.name = "TestRole" + self.mock_device_role.__str__.return_value = "TestRole" + self.mock_device.role = self.mock_device_role + + # Set up rack information + rack = MagicMock() + rack.name = "TestRack" + self.mock_device.rack = rack + + # Set up platform information + platform = MagicMock() + platform.name = "TestPlatform" + self.mock_device.platform = platform + + # Device-specific properties + device_type = MagicMock() + manufacturer = MagicMock() + manufacturer.name = "TestManufacturer" + device_type.manufacturer = manufacturer + self.mock_device.device_type = device_type + + # Create mock VM + self.mock_vm = MagicMock() + self.mock_vm.name = "test-vm" + + # Reuse site from device + self.mock_vm.site = site + + # Set up role for VM + self.mock_vm.role = self.mock_device_role + + # Set up platform for VM + self.mock_vm.platform = platform + + # VM-specific properties + cluster = MagicMock() + cluster.name = "TestCluster" + cluster_type = MagicMock() + cluster_type.name = "TestClusterType" + cluster.type = cluster_type + self.mock_vm.cluster = cluster + + def test_verify_list_based_hostgroup_format(self): + """Test verification of list-based hostgroup formats.""" + # List format with valid items + valid_format = ["region", "site", "rack"] + + # List format with nested path + valid_nested_format = ["region", "site/rack"] + + # List format with invalid item + invalid_format = ["region", "invalid_item", "rack"] + + # Should not raise exception for valid formats + verify_hg_format(valid_format, hg_type="dev", logger=self.mock_logger) + verify_hg_format(valid_nested_format, hg_type="dev", logger=self.mock_logger) + + # Should raise exception for invalid format + with self.assertRaises(HostgroupError): + verify_hg_format(invalid_format, hg_type="dev", logger=self.mock_logger) + + def test_simulate_hostgroup_generation_from_config(self): + """Simulate how the main script would generate hostgroups from list-based config.""" + # Mock configuration with list-based hostgroup format + config_format = ["region", "site", "rack"] + hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger) + + # Simulate the main script's hostgroup generation process + hostgroups = [] + for fmt in config_format: + result = hostgroup.generate(fmt) + if result: + hostgroups.append(result) + + # Check results + self.assertEqual(len(hostgroups), 3) + self.assertIn("TestRegion", hostgroups) + self.assertIn("TestSite", hostgroups) + self.assertIn("TestRack", hostgroups) + + def test_vm_hostgroup_format_from_config(self): + """Test VM hostgroup generation with list-based format.""" + # Mock VM configuration with mixed format + config_format = ["platform", "role", "cluster_type/cluster"] + hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger) + + # Simulate the main script's hostgroup generation process + hostgroups = [] + for fmt in config_format: + result = hostgroup.generate(fmt) + if result: + hostgroups.append(result) + + # Check results + self.assertEqual(len(hostgroups), 3) + self.assertIn("TestPlatform", hostgroups) + self.assertIn("TestRole", hostgroups) + self.assertIn("TestClusterType/TestCluster", hostgroups) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_usermacros.py b/tests/test_usermacros.py index 28305af..5c2b6a4 100644 --- a/tests/test_usermacros.py +++ b/tests/test_usermacros.py @@ -88,7 +88,7 @@ class TestZabbixUsermacros(unittest.TestCase): macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) result = macros.render_macro("{$FOO}", {"type": "text"}) self.assertFalse(result) - self.logger.warning.assert_called() + self.logger.info.assert_called() def test_render_macro_str(self): macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) @@ -102,7 +102,7 @@ class TestZabbixUsermacros(unittest.TestCase): macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) result = macros.render_macro("FOO", "bar") self.assertFalse(result) - self.logger.error.assert_called() + self.logger.warning.assert_called() def test_generate_from_map(self): nb = DummyNB(memory="bar", role="baz")