Merge pull request #5 from retigra/develop

Develop
This commit is contained in:
Raymond Kuiper 2025-09-12 15:40:33 +02:00 committed by GitHub
commit b9cf7b5bbe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 639 additions and 293 deletions

View File

@ -2,7 +2,6 @@
name: Pylint Quality control name: Pylint Quality control
on: on:
push:
pull_request: pull_request:
workflow_call: workflow_call:

View File

@ -2,7 +2,6 @@
name: Pytest code testing name: Pytest code testing
on: on:
push:
pull_request: pull_request:
workflow_call: workflow_call:

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
*.log *.log
.venv .venv
.env
config.py config.py
Pipfile Pipfile
Pipfile.lock Pipfile.lock

View File

@ -51,7 +51,7 @@ pip.
```sh ```sh
# Packages: # Packages:
pynetbox pynetbox
pyzabbix zabbix-utils
# Install them through requirements.txt from a venv: # Install them through requirements.txt from a venv:
virtualenv .venv virtualenv .venv
@ -212,6 +212,17 @@ in `config.py` the script will render a full region path of all parent regions
for the hostgroup name. `traverse_site_groups` controls the same behaviour for for the hostgroup name. `traverse_site_groups` controls the same behaviour for
site_groups. site_groups.
**Hardcoded text**
You can add hardcoded text in the hostgroup format by using quotes, this will
insert the literal text:
```python
hostgroup_format = "'MyDevices'/location/role"
```
In this case, the prefix MyDevices will be used for all generated groups.
**Custom fields** **Custom fields**
You can use the value of custom fields for hostgroup generation. This allows You can use the value of custom fields for hostgroup generation. This allows
@ -278,6 +289,27 @@ hostgroup_format = "mycustomfieldname"
NetBox-Zabbix-sync - ERROR - ESXI1 has no reliable hostgroup. This is most likely due to the use of custom fields that are empty. NetBox-Zabbix-sync - ERROR - ESXI1 has no reliable hostgroup. This is most likely due to the use of custom fields that are empty.
``` ```
### Extended site properties
By default, NetBox will only return the following properties under the 'site' key for a device:
- site id
- (api) url
- display name
- name
- slug
- description
However, NetBox-Zabbix-Sync allows you to extend these site properties with the full site information
so you can use this data in inventory fields, tags and usermacros.
To enable this functionality, enable the following setting in your configuration file:
`extended_site_properties = True`
Keep in mind that enabling this option will increase the number of API calls to your NetBox instance,
this might impact performance on large syncs.
### Device status ### Device status
By setting a status on a NetBox device you determine how the host is added (or By setting a status on a NetBox device you determine how the host is added (or
@ -382,9 +414,9 @@ Tags can be synced from the following sources:
Syncing tags will override any tags that were set manually on the host, Syncing tags will override any tags that were set manually on the host,
making NetBox the single source-of-truth for managing tags. making NetBox the single source-of-truth for managing tags.
To enable syncing, turn on tag_sync in the config file. To enable syncing, turn on `tag_sync` in the config file.
By default, this script will modify tag names and tag values to lowercase. By default, this script will modify tag names and tag values to lowercase.
You can change this behaviour by setting tag_lower to False. You can change this behaviour by setting `tag_lower` to `False`.
```python ```python
tag_sync = True tag_sync = True
@ -397,7 +429,8 @@ As NetBox doesn't follow the tag/value pattern for tags, we will need a tag
name set to register the netbox tags. name set to register the netbox tags.
By default the tag name is "NetBox", but you can change this to whatever you want. By default the tag name is "NetBox", but you can change this to whatever you want.
The value for the tag can be set to 'name', 'display', or 'slug', which refers to the property of the NetBox tag object that will be used as the value in Zabbix. The value for the tag can be set to 'name', 'display', or 'slug', which refers to the
property of the NetBox tag object that will be used as the value in Zabbix.
```python ```python
tag_name = 'NetBox' tag_name = 'NetBox'
@ -480,7 +513,7 @@ Through this method, it is possible to define the following types of usermacros:
2. Secret 2. Secret
3. Vault 3. Vault
The default macro type is text if no `type` and `value` have been set. The default macro type is text, if no `type` and `value` have been set.
It is also possible to create usermacros with It is also possible to create usermacros with
[context](https://www.zabbix.com/documentation/7.0/en/manual/config/macros/user_macros_context). [context](https://www.zabbix.com/documentation/7.0/en/manual/config/macros/user_macros_context).
@ -600,7 +633,8 @@ python3 netbox_zabbix_sync.py
### Zabbix proxy ### Zabbix proxy
You can set the proxy for a device using the 'proxy' key in config context. #### Config Context
You can set the proxy for a device using the `proxy` key in config context.
```json ```json
{ {
@ -641,6 +675,34 @@ In the example above the host will use the group on Zabbix 7. On Zabbix 6 and
below the host will use the proxy. Zabbix 7 will use the proxy value when below the host will use the proxy. Zabbix 7 will use the proxy value when
omitting the proxy_group value. omitting the proxy_group value.
#### Custom Field
Alternatively, you can use a custom field for assigning a device or VM to
a Zabbix proxy or proxy group. The custom fields can be assigned to both
Devices and VMs.
You can also assign these custom fields to a site to allow all devices/VMs
in that site to be configured with the same proxy or proxy group.
In order for this to work, `extended_site_properties` needs to be enabled in
the configuation as well.
To use the custom fields for proxy configuration, configure one or both
of the following settings in the configuration file with the actual names of your
custom fields:
```python
proxy_cf = "zabbix_proxy"
proxy_group_cf = "zabbix_proxy_group"
```
As with config context proxy configuration, proxy group will take precedence over
standalone proxy when configured.
Proxy settings configured on the device or VM will in their turn take precedence
over any site configuration.
If the custom fields have no value but the proxy or proxy group is configured in config context,
that setting will be used.
### Set interface parameters within NetBox ### Set interface parameters within NetBox
When adding a new device, you can set the interface type with custom context. By When adding a new device, you can set the interface type with custom context. By

View File

@ -53,6 +53,12 @@ hostgroup_format = "site/manufacturer/role"
traverse_regions = False traverse_regions = False
traverse_site_groups = False traverse_site_groups = False
## Extended site properteis
# By default, NetBox will only return basic site info for any device or VM.
# By setting `extended_site_properties` to True, the script will query NetBox for additiopnal site info.
# Be aware that this will increase the number of API queries to NetBox.
extended_site_properties = False
## Filtering ## Filtering
# Custom device filter, variable must be present but can be left empty with no filtering. # Custom device filter, variable must be present but can be left empty with no filtering.
# A couple of examples: # A couple of examples:
@ -96,6 +102,8 @@ device_inventory_map = { "asset_tag": "asset_tag",
"device_type/model": "type", "device_type/model": "type",
"device_type/manufacturer/name": "vendor", "device_type/manufacturer/name": "vendor",
"oob_ip/address": "oob_ip" } "oob_ip/address": "oob_ip" }
# Replace latitude and longitude with site/latitude and and site/longitude to use
# site geo data. Enable extended_site_properties for this to work!
# We also support inventory mapping on Virtual Machines. # We also support inventory mapping on Virtual Machines.
vm_inventory_map = { "status/label": "deployment_status", vm_inventory_map = { "status/label": "deployment_status",
@ -112,19 +120,19 @@ usermacro_sync = False
# device usermacro_map to map NetBox fields to usermacros. # device usermacro_map to map NetBox fields to usermacros.
device_usermacro_map = {"serial": "{$HW_SERIAL}", device_usermacro_map = {"serial": "{$HW_SERIAL}",
"role/name": "{$DEV_ROLE}", "role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}", "display_url": "{$NB_URL}",
"id": "{$NB_ID}"} "id": "{$NB_ID}"}
# virtual machine usermacro_map to map NetBox fields to usermacros. # virtual machine usermacro_map to map NetBox fields to usermacros.
vm_usermacro_map = {"memory": "{$TOTAL_MEMORY}", vm_usermacro_map = {"memory": "{$TOTAL_MEMORY}",
"role/name": "{$DEV_ROLE}", "role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}", "display_url": "{$NB_URL}",
"id": "{$NB_ID}"} "id": "{$NB_ID}"}
# To sync host tags to Zabbix, set to True. # To sync host tags to Zabbix, set to True.
tag_sync = False tag_sync = False
# Setting tag_lower to True will lower capital letters ain tag names and values # Setting tag_lower to True will lower capital letters in tag names and values
# This is more inline with the Zabbix way of working with tags. # This is more inline with the Zabbix way of working with tags.
# #
# You can however set this to False to ensure capital letters are synced to Zabbix tags. # You can however set this to False to ensure capital letters are synced to Zabbix tags.
@ -133,8 +141,6 @@ tag_lower = True
# We can sync NetBox device/VM tags to Zabbix, but as NetBox tags don't follow the key/value # We can sync NetBox device/VM tags to Zabbix, but as NetBox tags don't follow the key/value
# pattern, we need to specify a tag name to register the NetBox tags in Zabbix. # pattern, we need to specify a tag name to register the NetBox tags in Zabbix.
# #
#
#
# If tag_name is set to False, we won't sync NetBox device/VM tags to Zabbix. # If tag_name is set to False, we won't sync NetBox device/VM tags to Zabbix.
tag_name = 'NetBox' tag_name = 'NetBox'

View File

@ -16,6 +16,8 @@ DEFAULT_CONFIG = {
"templates_config_context_overrule": False, "templates_config_context_overrule": False,
"template_cf": "zabbix_template", "template_cf": "zabbix_template",
"device_cf": "zabbix_hostid", "device_cf": "zabbix_hostid",
"proxy_cf": False,
"proxy_group_cf" : False,
"clustering": False, "clustering": False,
"create_hostgroups": True, "create_hostgroups": True,
"create_journal": False, "create_journal": False,
@ -31,6 +33,7 @@ DEFAULT_CONFIG = {
"nb_vm_filter": {"name__n": "null"}, "nb_vm_filter": {"name__n": "null"},
"inventory_mode": "disabled", "inventory_mode": "disabled",
"inventory_sync": False, "inventory_sync": False,
"extended_site_properties": False,
"device_inventory_map": { "device_inventory_map": {
"asset_tag": "asset_tag", "asset_tag": "asset_tag",
"virtual_chassis/name": "chassis", "virtual_chassis/name": "chassis",

View File

@ -7,6 +7,7 @@ from copy import deepcopy
from logging import getLogger from logging import getLogger
from re import search from re import search
from operator import itemgetter from operator import itemgetter
from typing import Any
from zabbix_utils import APIRequestError from zabbix_utils import APIRequestError
from pynetbox import RequestError as NetboxRequestError from pynetbox import RequestError as NetboxRequestError
@ -20,12 +21,13 @@ from modules.exceptions import (
from modules.hostgroups import Hostgroup from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface from modules.interface import ZabbixInterface
from modules.tags import ZabbixTags from modules.tags import ZabbixTags
from modules.tools import field_mapper, remove_duplicates, sanatize_log_output from modules.tools import field_mapper, cf_to_string, remove_duplicates, sanatize_log_output
from modules.usermacros import ZabbixUsermacros from modules.usermacros import ZabbixUsermacros
from modules.config import load_config from modules.config import load_config
config = load_config() config = load_config()
class PhysicalDevice: class PhysicalDevice:
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments # pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments
""" """
@ -48,6 +50,7 @@ class PhysicalDevice:
self.zbx_template_names = [] self.zbx_template_names = []
self.zbx_templates = [] self.zbx_templates = []
self.hostgroups = [] self.hostgroups = []
self.hostgroup_type = "dev"
self.tenant = nb.tenant self.tenant = nb.tenant
self.config_context = nb.config_context self.config_context = nb.config_context
self.zbxproxy = None self.zbxproxy = None
@ -96,8 +99,8 @@ class PhysicalDevice:
if config["device_cf"] in self.nb.custom_fields: if config["device_cf"] in self.nb.custom_fields:
self.zabbix_id = self.nb.custom_fields[config["device_cf"]] self.zabbix_id = self.nb.custom_fields[config["device_cf"]]
else: else:
e = f'Host {self.name}: Custom field {config["device_cf"]} not present' e = f"Host {self.name}: Custom field {config['device_cf']} not present"
self.logger.warning(e) self.logger.error(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
# Validate hostname format. # Validate hostname format.
@ -110,9 +113,11 @@ class PhysicalDevice:
self.visible_name = self.nb.name self.visible_name = self.nb.name
self.use_visible_name = True self.use_visible_name = True
self.logger.info( self.logger.info(
f"Host {self.visible_name} contains special characters. " "Host %s contains special characters."
f"Using {self.name} as name for the NetBox object " "Using %s as name for the NetBox object and using %s as visible name in Zabbix.",
f"and using {self.visible_name} as visible name in Zabbix." self.visible_name,
self.name,
self.visible_name,
) )
else: else:
pass pass
@ -121,12 +126,12 @@ class PhysicalDevice:
"""Set the hostgroup for this device""" """Set the hostgroup for this device"""
# Create new Hostgroup instance # Create new Hostgroup instance
hg = Hostgroup( hg = Hostgroup(
"dev", self.hostgroup_type,
self.nb, self.nb,
self.nb_api_version, self.nb_api_version,
logger=self.logger, logger=self.logger,
nested_sitegroup_flag=config['traverse_site_groups'], nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config['traverse_regions'], nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups, nb_groups=nb_site_groups,
nb_regions=nb_regions, nb_regions=nb_regions,
) )
@ -135,6 +140,14 @@ class PhysicalDevice:
self.hostgroups = [hg.generate(f) for f in hg_format] self.hostgroups = [hg.generate(f) for f in hg_format]
else: else:
self.hostgroups.append(hg.generate(hg_format)) self.hostgroups.append(hg.generate(hg_format))
# Remove duplicates and None values
self.hostgroups = list(filter(None, list(set(self.hostgroups))))
if self.hostgroups:
self.logger.debug(
"Host %s: Should be member of groups: %s", self.name, self.hostgroups
)
return True
return False
def set_template(self, prefer_config_context, overrule_custom): def set_template(self, prefer_config_context, overrule_custom):
"""Set Template""" """Set Template"""
@ -177,8 +190,6 @@ class PhysicalDevice:
self.logger.warning(e) self.logger.warning(e)
raise TemplateError(e) raise TemplateError(e)
def get_templates_context(self): def get_templates_context(self):
"""Get Zabbix templates from the device context""" """Get Zabbix templates from the device context"""
if "zabbix" not in self.config_context: if "zabbix" not in self.config_context:
@ -203,9 +214,11 @@ class PhysicalDevice:
# Set inventory mode. Default is disabled (see class init function). # Set inventory mode. Default is disabled (see class init function).
if config["inventory_mode"] == "disabled": if config["inventory_mode"] == "disabled":
if config["inventory_sync"]: if config["inventory_sync"]:
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " self.logger.error(
"Inventory sync is enabled in " "Host %s: Unable to map NetBox inventory to Zabbix."
"config but inventory mode is disabled.") "Inventory sync is enabled in config but inventory mode is disabled",
self.name,
)
return True return True
if config["inventory_mode"] == "manual": if config["inventory_mode"] == "manual":
self.inventory_mode = 0 self.inventory_mode = 0
@ -213,16 +226,20 @@ class PhysicalDevice:
self.inventory_mode = 1 self.inventory_mode = 1
else: else:
self.logger.error( self.logger.error(
f"Host {self.name}: Specified value for inventory mode in" "Host %s: Specified value for inventory mode in config is not valid. Got value %s",
f" config is not valid. Got value {config['inventory_mode']}" self.name,
config["inventory_mode"],
) )
return False return False
self.inventory = {} self.inventory = {}
if config["inventory_sync"] and self.inventory_mode in [0, 1]: if config["inventory_sync"] and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper") self.logger.debug("Host %s: Starting inventory mapper.", self.name)
self.inventory = field_mapper( self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger self.name, self._inventory_map(), nbdevice, self.logger
) )
self.logger.debug(
"Host %s: Resolved inventory: %s", self.name, self.inventory
)
return True return True
def isCluster(self): def isCluster(self):
@ -240,14 +257,14 @@ class PhysicalDevice:
f"Unable to proces {self.name} for cluster calculation: " f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster." f"not part of a cluster."
) )
self.logger.warning(e) self.logger.info(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
if not self.nb.virtual_chassis.master: if not self.nb.virtual_chassis.master:
e = ( e = (
f"{self.name} is part of a NetBox virtual chassis which does " f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason." "not have a master configured. Skipping for this reason."
) )
self.logger.error(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
return self.nb.virtual_chassis.master.id return self.nb.virtual_chassis.master.id
@ -259,14 +276,15 @@ class PhysicalDevice:
""" """
masterid = self.getClusterMaster() masterid = self.getClusterMaster()
if masterid == self.id: if masterid == self.id:
self.logger.debug( self.logger.info(
f"Host {self.name} is primary cluster member. " "Host %s is primary cluster member. Modifying hostname from %s to %s.",
f"Modifying hostname from {self.name} to " self.name,
+ f"{self.nb.virtual_chassis.name}." self.name,
self.nb.virtual_chassis.name,
) )
self.name = self.nb.virtual_chassis.name self.name = self.nb.virtual_chassis.name
return True return True
self.logger.debug(f"Host {self.name} is non-primary cluster member.") self.logger.info("Host %s is non-primary cluster member.", self.name)
return False return False
def zbxTemplatePrepper(self, templates): def zbxTemplatePrepper(self, templates):
@ -278,7 +296,7 @@ class PhysicalDevice:
# Check if there are templates defined # Check if there are templates defined
if not self.zbx_template_names: if not self.zbx_template_names:
e = f"Host {self.name}: No templates found" e = f"Host {self.name}: No templates found"
self.logger.info(e) self.logger.warning(e)
raise SyncInventoryError() raise SyncInventoryError()
# Set variable to empty list # Set variable to empty list
self.zbx_templates = [] self.zbx_templates = []
@ -298,7 +316,10 @@ class PhysicalDevice:
"name": zbx_template["name"], "name": zbx_template["name"],
} }
) )
e = f"Host {self.name}: found template {zbx_template['name']}" e = (
f"Host {self.name}: Found template '{zbx_template['name']}' "
f"(ID:{zbx_template['templateid']})"
)
self.logger.debug(e) self.logger.debug(e)
# Return error should the template not be found in Zabbix # Return error should the template not be found in Zabbix
if not template_match: if not template_match:
@ -321,8 +342,8 @@ class PhysicalDevice:
if group["name"] == hg: if group["name"] == hg:
self.group_ids.append({"groupid": group["groupid"]}) self.group_ids.append({"groupid": group["groupid"]})
e = ( e = (
f"Host {self.name}: matched group " f"Host {self.name}: Matched group "
f"\"{group['name']}\" (ID:{group['groupid']})" f'"{group["name"]}" (ID:{group["groupid"]})'
) )
self.logger.debug(e) self.logger.debug(e)
if len(self.group_ids) == len(self.hostgroups): if len(self.group_ids) == len(self.hostgroups):
@ -403,7 +424,7 @@ class PhysicalDevice:
macros = ZabbixUsermacros( macros = ZabbixUsermacros(
self.nb, self.nb,
self._usermacro_map(), self._usermacro_map(),
config['usermacro_sync'], config["usermacro_sync"],
logger=self.logger, logger=self.logger,
host=self.name, host=self.name,
) )
@ -421,46 +442,56 @@ class PhysicalDevice:
tags = ZabbixTags( tags = ZabbixTags(
self.nb, self.nb,
self._tag_map(), self._tag_map(),
config['tag_sync'], tag_sync=config["tag_sync"],
config['tag_lower'], tag_lower=config["tag_lower"],
tag_name=config['tag_name'], tag_name=config["tag_name"],
tag_value=config['tag_value'], tag_value=config["tag_value"],
logger=self.logger, logger=self.logger,
host=self.name, host=self.name,
) )
if tags.sync is False: if config["tag_sync"] is False:
self.tags = [] self.tags = []
return False
self.tags = tags.generate() self.tags = tags.generate()
return True return True
def setProxy(self, proxy_list): def _setProxy(self, proxy_list: list[dict[str, Any]]) -> bool:
""" """
Sets proxy or proxy group if this Sets proxy or proxy group if this
value has been defined in config context value has been defined in config context
or custom fields.
input: List of all proxies and proxy groups in standardized format input: List of all proxies and proxy groups in standardized format
""" """
# check if the key Zabbix is defined in the config context
if "zabbix" not in self.nb.config_context:
return False
if (
"proxy" in self.nb.config_context["zabbix"]
and not self.nb.config_context["zabbix"]["proxy"]
):
return False
# Proxy group takes priority over a proxy due # Proxy group takes priority over a proxy due
# to it being HA and therefore being more reliable # to it being HA and therefore being more reliable
# Includes proxy group fix since Zabbix <= 6 should ignore this # Includes proxy group fix since Zabbix <= 6 should ignore this
proxy_types = ["proxy"] proxy_types = ["proxy"]
if str(self.zabbix.version).startswith("7"): proxy_name = None
if self.zabbix.version >= 7.0:
# Only insert groups in front of list for Zabbix7 # Only insert groups in front of list for Zabbix7
proxy_types.insert(0, "proxy_group") proxy_types.insert(0, "proxy_group")
# loop through supported proxy-types
for proxy_type in proxy_types: for proxy_type in proxy_types:
# Check if the key exists in NetBox CC # Check if we should use custom fields for proxy config
if proxy_type in self.nb.config_context["zabbix"]: field_config = "proxy_cf" if proxy_type=="proxy" else "proxy_group_cf"
if config[field_config]:
if config[field_config] in self.nb.custom_fields:
if self.nb.custom_fields[config[field_config]]:
proxy_name = cf_to_string(self.nb.custom_fields[config[field_config]])
elif config[field_config] in self.nb.site.custom_fields:
if self.nb.site.custom_fields[config[field_config]]:
proxy_name = cf_to_string(self.nb.site.custom_fields[config[field_config]])
# Otherwise check if the proxy is configured in NetBox CC
if (not proxy_name and "zabbix" in self.nb.config_context and
proxy_type in self.nb.config_context["zabbix"]):
proxy_name = self.nb.config_context["zabbix"][proxy_type] proxy_name = self.nb.config_context["zabbix"][proxy_type]
# go through all proxies
# If a proxy name was found, loop through all proxies to find a match
if proxy_name:
for proxy in proxy_list: for proxy in proxy_list:
# If the proxy does not match the type, ignore and continue # If the proxy does not match the type, ignore and continue
if not proxy["type"] == proxy_type: if not proxy["type"] == proxy_type:
@ -468,12 +499,13 @@ class PhysicalDevice:
# If the proxy name matches # If the proxy name matches
if proxy["name"] == proxy_name: if proxy["name"] == proxy_name:
self.logger.debug( self.logger.debug(
f"Host {self.name}: using {proxy['type']}" f" {proxy_name}" "Host %s: using {proxy['type']} '%s'", self.name, proxy_name
) )
self.zbxproxy = proxy self.zbxproxy = proxy
return True return True
self.logger.warning( self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}" "Host %s: unable to find proxy %s", self.name, proxy_name
) )
return False return False
@ -503,16 +535,15 @@ class PhysicalDevice:
templateids.append({"templateid": template["templateid"]}) templateids.append({"templateid": template["templateid"]})
# Set interface, group and template configuration # Set interface, group and template configuration
interfaces = self.setInterfaceDetails() interfaces = self.setInterfaceDetails()
groups = self.group_ids
# Set Zabbix proxy if defined # Set Zabbix proxy if defined
self.setProxy(proxies) self._setProxy(proxies)
# Set basic data for host creation # Set basic data for host creation
create_data = { create_data = {
"host": self.name, "host": self.name,
"name": self.visible_name, "name": self.visible_name,
"status": self.zabbix_state, "status": self.zabbix_state,
"interfaces": interfaces, "interfaces": interfaces,
"groups": groups, "groups": self.group_ids,
"templates": templateids, "templates": templateids,
"description": description, "description": description,
"inventory_mode": self.inventory_mode, "inventory_mode": self.inventory_mode,
@ -541,12 +572,12 @@ class PhysicalDevice:
# Set NetBox custom field to hostID value. # Set NetBox custom field to hostID value.
self.nb.custom_fields[config["device_cf"]] = int(self.zabbix_id) self.nb.custom_fields[config["device_cf"]] = int(self.zabbix_id)
self.nb.save() self.nb.save()
msg = f"Host {self.name}: Created host in Zabbix." msg = f"Host {self.name}: Created host in Zabbix. (ID:{self.zabbix_id})"
self.logger.info(msg) self.logger.info(msg)
self.create_journal_entry("success", msg) self.create_journal_entry("success", msg)
else: else:
self.logger.error( self.logger.error(
f"Host {self.name}: Unable to add to Zabbix. Host already present." "Host %s: Unable to add to Zabbix. Host already present.", self.name
) )
def createZabbixHostgroup(self, hostgroups): def createZabbixHostgroup(self, hostgroups):
@ -604,7 +635,9 @@ class PhysicalDevice:
) )
self.logger.error(e) self.logger.error(e)
raise SyncExternalError(e) from None raise SyncExternalError(e) from None
self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.") self.logger.info(
"Host %s: updated with data %s.", self.name, sanatize_log_output(kwargs)
)
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck( def ConsistencyCheck(
@ -632,12 +665,10 @@ class PhysicalDevice:
) )
self.logger.warning(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
#if self.group_ids:
# self.group_ids.append(self.pri_group_id)
# Prepare templates and proxy config # Prepare templates and proxy config
self.zbxTemplatePrepper(templates) self.zbxTemplatePrepper(templates)
self.setProxy(proxies) self._setProxy(proxies)
# Get host object from Zabbix # Get host object from Zabbix
host = self.zabbix.host.get( host = self.zabbix.host.get(
filter={"hostid": self.zabbix_id}, filter={"hostid": self.zabbix_id},
@ -666,28 +697,30 @@ class PhysicalDevice:
raise SyncInventoryError(e) raise SyncInventoryError(e)
host = host[0] host = host[0]
if host["host"] == self.name: if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: hostname in-sync.") self.logger.debug("Host %s: Hostname in-sync.", self.name)
else: else:
self.logger.warning( self.logger.info(
f"Host {self.name}: hostname OUT of sync. " "Host %s: Hostname OUT of sync. Received value: %s",
f"Received value: {host['host']}" self.name,
host["host"],
) )
self.updateZabbixHost(host=self.name) self.updateZabbixHost(host=self.name)
# Execute check depending on wether the name is special or not # Execute check depending on wether the name is special or not
if self.use_visible_name: if self.use_visible_name:
if host["name"] == self.visible_name: if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: visible name in-sync.") self.logger.debug("Host %s: Visible name in-sync.", self.name)
else: else:
self.logger.warning( self.logger.info(
f"Host {self.name}: visible name OUT of sync." "Host %s: Visible name OUT of sync. Received value: %s",
f" Received value: {host['name']}" self.name,
host["name"],
) )
self.updateZabbixHost(name=self.visible_name) self.updateZabbixHost(name=self.visible_name)
# Check if the templates are in-sync # Check if the templates are in-sync
if not self.zbx_template_comparer(host["parentTemplates"]): if not self.zbx_template_comparer(host["parentTemplates"]):
self.logger.warning(f"Host {self.name}: template(s) OUT of sync.") self.logger.info("Host %s: Template(s) OUT of sync.", self.name)
# Prepare Templates for API parsing # Prepare Templates for API parsing
templateids = [] templateids = []
for template in self.zbx_templates: for template in self.zbx_templates:
@ -697,38 +730,41 @@ class PhysicalDevice:
templates_clear=host["parentTemplates"], templates=templateids templates_clear=host["parentTemplates"], templates=templateids
) )
else: else:
self.logger.debug(f"Host {self.name}: template(s) in-sync.") self.logger.debug("Host %s: Template(s) in-sync.", self.name)
# Check if Zabbix version is 6 or higher. Issue #93 # Check if Zabbix version is 6 or higher. Issue #93
group_dictname = "hostgroups" group_dictname = "hostgroups"
if str(self.zabbix.version).startswith(("6", "5")): if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups" group_dictname = "groups"
# Check if hostgroups match # Check if hostgroups match
if (sorted(host[group_dictname], key=itemgetter('groupid')) == if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted(
sorted(self.group_ids, key=itemgetter('groupid'))): self.group_ids, key=itemgetter("groupid")
self.logger.debug(f"Host {self.name}: hostgroups in-sync.") ):
self.logger.debug("Host %s: Hostgroups in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: hostgroups OUT of sync.") self.logger.info("Host %s: Hostgroups OUT of sync.", self.name)
self.updateZabbixHost(groups=self.group_ids) self.updateZabbixHost(groups=self.group_ids)
if int(host["status"]) == self.zabbix_state: if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: status in-sync.") self.logger.debug("Host %s: Status in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: status OUT of sync.") self.logger.info("Host %s: Status OUT of sync.", self.name)
self.updateZabbixHost(status=str(self.zabbix_state)) self.updateZabbixHost(status=str(self.zabbix_state))
# Check if a proxy has been defined # Check if a proxy has been defined
if self.zbxproxy: if self.zbxproxy:
# Check if proxy or proxy group is defined # Check if proxy or proxy group is defined
if (self.zbxproxy["idtype"] in host and if (
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]): self.zbxproxy["idtype"] in host
self.logger.debug(f"Host {self.name}: proxy in-sync.") and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]
):
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Backwards compatibility for Zabbix <= 6 # Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]: elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
self.logger.debug(f"Host {self.name}: proxy in-sync.") self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Proxy does not match, update Zabbix # Proxy does not match, update Zabbix
else: else:
self.logger.warning(f"Host {self.name}: proxy OUT of sync.") self.logger.info("Host %s: Proxy OUT of sync.", self.name)
# Zabbix <= 6 patch # Zabbix <= 6 patch
if not str(self.zabbix.version).startswith("7"): if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"]) self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
@ -751,8 +787,9 @@ class PhysicalDevice:
if proxy_power and proxy_set: if proxy_power and proxy_set:
# Zabbix <= 6 fix # Zabbix <= 6 fix
self.logger.warning( self.logger.warning(
f"Host {self.name}: no proxy is configured in NetBox " "Host %s: No proxy is configured in NetBox but is configured in Zabbix."
"but is configured in Zabbix. Removing proxy config in Zabbix" "Removing proxy config in Zabbix",
self.name,
) )
if "proxy_hostid" in host and bool(host["proxy_hostid"]): if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0) self.updateZabbixHost(proxy_hostid=0)
@ -765,60 +802,61 @@ class PhysicalDevice:
# Checks if a proxy has been defined in Zabbix and if proxy_power config has been set # Checks if a proxy has been defined in Zabbix and if proxy_power config has been set
if proxy_set and not proxy_power: if proxy_set and not proxy_power:
# Display error message # Display error message
self.logger.error( self.logger.warning(
f"Host {self.name} is configured " "Host %s: Is configured with proxy in Zabbix but not in NetBox."
f"with proxy in Zabbix but not in NetBox. The" "full_proxy_sync is not set: no changes have been made.",
" -p flag was ommited: no " self.name,
"changes have been made."
) )
if not proxy_set: if not proxy_set:
self.logger.debug(f"Host {self.name}: proxy in-sync.") self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Check host inventory mode # Check host inventory mode
if str(host["inventory_mode"]) == str(self.inventory_mode): if str(host["inventory_mode"]) == str(self.inventory_mode):
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.") self.logger.debug("Host %s: inventory_mode in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.") self.logger.info("Host %s: inventory_mode OUT of sync.", self.name)
self.updateZabbixHost(inventory_mode=str(self.inventory_mode)) self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
if config["inventory_sync"] and self.inventory_mode in [0, 1]: if config["inventory_sync"] and self.inventory_mode in [0, 1]:
# Check host inventory mapping # Check host inventory mapping
if host["inventory"] == self.inventory: if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: inventory in-sync.") self.logger.debug("Host %s: Inventory in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: inventory OUT of sync.") self.logger.info("Host %s: Inventory OUT of sync.", self.name)
self.updateZabbixHost(inventory=self.inventory) self.updateZabbixHost(inventory=self.inventory)
# Check host usermacros # Check host usermacros
if config['usermacro_sync']: if config["usermacro_sync"]:
# Make a full copy synce we dont want to lose the original value # Make a full copy synce we dont want to lose the original value
# of secret type macros from Netbox # of secret type macros from Netbox
netbox_macros = deepcopy(self.usermacros) netbox_macros = deepcopy(self.usermacros)
# Set the sync bit # Set the sync bit
full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full") full_sync_bit = bool(str(config["usermacro_sync"]).lower() == "full")
for macro in netbox_macros: for macro in netbox_macros:
# If the Macro is a secret and full sync is NOT activated # If the Macro is a secret and full sync is NOT activated
if macro["type"] == str(1) and not full_sync_bit: if macro["type"] == str(1) and not full_sync_bit:
# Remove the value as the Zabbix api does not return the value key # Remove the value as the Zabbix api does not return the value key
# This is required when you want to do a diff between both lists # This is required when you want to do a diff between both lists
macro.pop("value") macro.pop("value")
# Sort all lists # Sort all lists
def filter_with_macros(macro): def filter_with_macros(macro):
return macro["macro"] return macro["macro"]
host["macros"].sort(key=filter_with_macros) host["macros"].sort(key=filter_with_macros)
netbox_macros.sort(key=filter_with_macros) netbox_macros.sort(key=filter_with_macros)
# Check if both lists are the same # Check if both lists are the same
if host["macros"] == netbox_macros: if host["macros"] == netbox_macros:
self.logger.debug(f"Host {self.name}: usermacros in-sync.") self.logger.debug("Host %s: Usermacros in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: usermacros OUT of sync.") self.logger.info("Host %s: Usermacros OUT of sync.", self.name)
# Update Zabbix with NetBox usermacros # Update Zabbix with NetBox usermacros
self.updateZabbixHost(macros=self.usermacros) self.updateZabbixHost(macros=self.usermacros)
# Check host tags # Check host tags
if config['tag_sync']: if config["tag_sync"]:
if remove_duplicates(host["tags"], sortkey="tag") == self.tags: if remove_duplicates(host["tags"], sortkey="tag") == self.tags:
self.logger.debug(f"Host {self.name}: tags in-sync.") self.logger.debug("Host %s: Tags in-sync.", self.name)
else: else:
self.logger.warning(f"Host {self.name}: tags OUT of sync.") self.logger.info("Host %s: Tags OUT of sync.", self.name)
self.updateZabbixHost(tags=self.tags) self.updateZabbixHost(tags=self.tags)
# If only 1 interface has been found # If only 1 interface has been found
@ -856,11 +894,11 @@ class PhysicalDevice:
updates[key] = item updates[key] = item
if updates: if updates:
# If interface updates have been found: push to Zabbix # If interface updates have been found: push to Zabbix
self.logger.warning(f"Host {self.name}: Interface OUT of sync.") self.logger.info("Host %s: Interface OUT of sync.", self.name)
if "type" in updates: if "type" in updates:
# Changing interface type not supported. Raise exception. # Changing interface type not supported. Raise exception.
e = ( e = (
f"Host {self.name}: changing interface type to " f"Host {self.name}: Changing interface type to "
f"{str(updates['type'])} is not supported." f"{str(updates['type'])} is not supported."
) )
self.logger.error(e) self.logger.error(e)
@ -870,26 +908,27 @@ class PhysicalDevice:
try: try:
# API call to Zabbix # API call to Zabbix
self.zabbix.hostinterface.update(updates) self.zabbix.hostinterface.update(updates)
e = (f"Host {self.name}: updated interface " err_msg = (
f"with data {sanatize_log_output(updates)}.") f"Host {self.name}: Updated interface "
self.logger.info(e) f"with data {sanatize_log_output(updates)}."
self.create_journal_entry("info", e) )
self.logger.info(err_msg)
self.create_journal_entry("info", err_msg)
except APIRequestError as e: except APIRequestError as e:
msg = f"Zabbix returned the following error: {str(e)}." msg = f"Zabbix returned the following error: {str(e)}."
self.logger.error(msg) self.logger.error(msg)
raise SyncExternalError(msg) from e raise SyncExternalError(msg) from e
else: else:
# If no updates are found, Zabbix interface is in-sync # If no updates are found, Zabbix interface is in-sync
e = f"Host {self.name}: interface in-sync." self.logger.debug("Host %s: Interface in-sync.", self.name)
self.logger.debug(e)
else: else:
e = ( err_msg = (
f"Host {self.name} has unsupported interface configuration." f"Host {self.name}: Has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. " f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual intervention required." "Manual intervention required."
) )
self.logger.error(e) self.logger.error(err_msg)
raise SyncInventoryError(e) raise SyncInventoryError(err_msg)
def create_journal_entry(self, severity, message): def create_journal_entry(self, severity, message):
""" """
@ -900,7 +939,7 @@ class PhysicalDevice:
# Check if the severity is valid # Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]: if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning( self.logger.warning(
f"Value {severity} not valid for NB journal entries." "Value %s not valid for NB journal entries.", severity
) )
return False return False
journal = { journal = {
@ -911,12 +950,13 @@ class PhysicalDevice:
} }
try: try:
self.nb_journals.create(journal) self.nb_journals.create(journal)
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox") self.logger.debug("Host %s: Created journal entry in NetBox", self.name)
return True return True
except NetboxRequestError as e: except NetboxRequestError as e:
self.logger.warning( self.logger.warning(
"Unable to create journal entry for " "Unable to create journal entry for %s: NB returned %s",
f"{self.name}: NB returned {e}" self.name,
e,
) )
return False return False
return False return False
@ -941,8 +981,9 @@ class PhysicalDevice:
tmpls_from_zabbix.pop(pos) tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl) succesfull_templates.append(nb_tmpl)
self.logger.debug( self.logger.debug(
f"Host {self.name}: template " "Host %s: Template '%s' is present in Zabbix.",
f"{nb_tmpl['name']} is present in Zabbix." self.name,
nb_tmpl["name"],
) )
break break
if ( if (

View File

@ -3,7 +3,7 @@
from logging import getLogger from logging import getLogger
from modules.exceptions import HostgroupError from modules.exceptions import HostgroupError
from modules.tools import build_path from modules.tools import build_path, cf_to_string
class Hostgroup: class Hostgroup:
@ -11,6 +11,7 @@ class Hostgroup:
Takes type (vm or dev) and NB object""" Takes type (vm or dev) and NB object"""
# pylint: disable=too-many-arguments, disable=too-many-positional-arguments # pylint: disable=too-many-arguments, disable=too-many-positional-arguments
# pylint: disable=logging-fstring-interpolation
def __init__( def __init__(
self, self,
obj_type, obj_type,
@ -93,6 +94,11 @@ class Hostgroup:
format_options["cluster"] = self.nb.cluster.name format_options["cluster"] = self.nb.cluster.name
format_options["cluster_type"] = self.nb.cluster.type.name format_options["cluster_type"] = self.nb.cluster.type.name
self.format_options = format_options self.format_options = format_options
self.logger.debug(
"Host %s: Resolved properties for use in hostgroups: %s",
self.name,
self.format_options,
)
def set_nesting( def set_nesting(
self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
@ -103,19 +109,19 @@ class Hostgroup:
"region": {"flag": nested_region_flag, "data": nb_regions}, "region": {"flag": nested_region_flag, "data": nb_regions},
} }
def generate(self, hg_format=None): def generate(self, hg_format):
"""Generate hostgroup based on a provided format""" """Generate hostgroup based on a provided format"""
# Set format to default in case its not specified
if not hg_format:
hg_format = (
"site/manufacturer/role" if self.type == "dev" else "cluster/role"
)
# Split all given names # Split all given names
hg_output = [] hg_output = []
hg_items = hg_format.split("/") hg_items = hg_format.split("/")
for hg_item in hg_items: for hg_item in hg_items:
# Check if requested data is available as option for this host # Check if requested data is available as option for this host
if hg_item not in self.format_options: if hg_item not in self.format_options:
if hg_item.startswith(("'", '"')) and hg_item.endswith(("'", '"')):
hg_item = hg_item.strip("\'")
hg_item = hg_item.strip('\"')
hg_output.append(hg_item)
else:
# Check if a custom field exists with this name # Check if a custom field exists with this name
cf_data = self.custom_field_lookup(hg_item) cf_data = self.custom_field_lookup(hg_item)
# CF does not exist # CF does not exist
@ -128,24 +134,26 @@ class Hostgroup:
raise HostgroupError(msg) raise HostgroupError(msg)
# CF data is populated # CF data is populated
if cf_data["cf"]: if cf_data["cf"]:
hg_output.append(cf_data["cf"]) hg_output.append(cf_to_string(cf_data["cf"]))
continue continue
# Check if there is a value associated to the variable. # Check if there is a value associated to the variable.
# For instance, if a device has no location, do not use it with hostgroup calculation # For instance, if a device has no location, do not use it with hostgroup calculation
hostgroup_value = self.format_options[hg_item] hostgroup_value = self.format_options[hg_item]
if hostgroup_value: if hostgroup_value:
hg_output.append(hostgroup_value) hg_output.append(hostgroup_value)
else:
self.logger.info(
"Host %s: Used field '%s' has no value.", self.name, hg_item
)
# Check if the hostgroup is populated with at least one item. # Check if the hostgroup is populated with at least one item.
if bool(hg_output): if bool(hg_output):
return "/".join(hg_output) return "/".join(hg_output)
msg = ( msg = (
f"Unable to generate hostgroup for host {self.name}." f"Host {self.name}: Generating hostgroup name for '{hg_format}' failed. "
" Not enough valid items. This is most likely" f"This is most likely due to fields that have no value."
" due to the use of custom fields that are empty"
" or an invalid hostgroup format."
) )
self.logger.error(msg) self.logger.warning(msg)
raise HostgroupError(msg) return None
def list_formatoptions(self): def list_formatoptions(self):
""" """

View File

@ -3,6 +3,7 @@
""" """
All of the Zabbix Usermacro related configuration All of the Zabbix Usermacro related configuration
""" """
from logging import getLogger from logging import getLogger
from modules.tools import field_mapper, remove_duplicates from modules.tools import field_mapper, remove_duplicates
@ -15,7 +16,7 @@ class ZabbixTags:
self, self,
nb, nb,
tag_map, tag_map,
tag_sync, tag_sync=False,
tag_lower=True, tag_lower=True,
tag_name=None, tag_name=None,
tag_value=None, tag_value=None,
@ -76,7 +77,7 @@ class ZabbixTags:
else: else:
tag["tag"] = tag_name tag["tag"] = tag_name
else: else:
self.logger.warning(f"Tag {tag_name} is not a valid tag name, skipping.") self.logger.warning("Tag '%s' is not a valid tag name, skipping.", tag_name)
return False return False
if self.validate_value(tag_value): if self.validate_value(tag_value):
@ -85,8 +86,8 @@ class ZabbixTags:
else: else:
tag["value"] = tag_value tag["value"] = tag_value
else: else:
self.logger.warning( self.logger.info(
f"Tag {tag_name} has an invalid value: '{tag_value}', skipping." "Tag '%s' has an invalid value: '%s', skipping.", tag_name, tag_value
) )
return False return False
return tag return tag
@ -99,7 +100,7 @@ class ZabbixTags:
tags = [] tags = []
# Parse the field mapper for tags # Parse the field mapper for tags
if self.tag_map: if self.tag_map:
self.logger.debug(f"Host {self.nb.name}: Starting tag mapper") self.logger.debug("Host %s: Starting tag mapper.", self.nb.name)
field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger) field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger)
for tag, value in field_tags.items(): for tag, value in field_tags.items():
t = self.render_tag(tag, value) t = self.render_tag(tag, value)
@ -130,4 +131,6 @@ class ZabbixTags:
if t: if t:
tags.append(t) tags.append(t)
return remove_duplicates(tags, sortkey="tag") tags = remove_duplicates(tags, sortkey="tag")
self.logger.debug("Host %s: Resolved tags: %s", self.name, tags)
return tags

View File

@ -1,6 +1,8 @@
"""A collection of tools used by several classes""" """A collection of tools used by several classes"""
from modules.exceptions import HostgroupError from modules.exceptions import HostgroupError
def convert_recordset(recordset): def convert_recordset(recordset):
"""Converts netbox RedcordSet to list of dicts.""" """Converts netbox RedcordSet to list of dicts."""
recordlist = [] recordlist = []
@ -48,6 +50,18 @@ def proxy_prepper(proxy_list, proxy_group_list):
return output return output
def cf_to_string(cf, key="name", logger=None):
"""
Converts a dict custom fields to string
"""
if isinstance(cf, dict):
if key in cf:
return cf[key]
logger.error("Conversion of custom field failed, '%s' not found in cf dict.", key)
return None
return cf
def field_mapper(host, mapper, nbdevice, logger): def field_mapper(host, mapper, nbdevice, logger):
""" """
Maps NetBox field data to Zabbix properties. Maps NetBox field data to Zabbix properties.
@ -71,20 +85,23 @@ def field_mapper(host, mapper, nbdevice, logger):
data[zbx_field] = str(value) data[zbx_field] = str(value)
elif not value: elif not value:
# empty value should just be an empty string for API compatibility # empty value should just be an empty string for API compatibility
logger.debug( logger.info(
f"Host {host}: NetBox lookup for " "Host %s: NetBox lookup for '%s' returned an empty value.",
f"'{nb_field}' returned an empty value" host,
nb_field,
) )
data[zbx_field] = "" data[zbx_field] = ""
else: else:
# Value is not a string or numeral, probably not what the user expected. # Value is not a string or numeral, probably not what the user expected.
logger.error( logger.info(
f"Host {host}: Lookup for '{nb_field}'" "Host %s: Lookup for '%s' returned an unexpected type: it will be skipped.",
" returned an unexpected type: it will be skipped." host,
nb_field,
) )
logger.debug( logger.debug(
f"Host {host}: Field mapping complete. " "Host %s: Field mapping complete. Mapped %s field(s).",
f"Mapped {len(list(filter(None, data.values())))} field(s)" host,
len(list(filter(None, data.values()))),
) )
return data return data
@ -101,7 +118,9 @@ def remove_duplicates(input_list, sortkey=None):
return output_list return output_list
def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None): def verify_hg_format(
hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None
):
""" """
Verifies hostgroup field format Verifies hostgroup field format
""" """
@ -109,7 +128,9 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log
device_cfs = [] device_cfs = []
if not vm_cfs: if not vm_cfs:
vm_cfs = [] vm_cfs = []
allowed_objects = {"dev": ["location", allowed_objects = {
"dev": [
"location",
"rack", "rack",
"role", "role",
"manufacturer", "manufacturer",
@ -119,8 +140,10 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log
"tenant", "tenant",
"tenant_group", "tenant_group",
"platform", "platform",
"cluster"] "cluster",
,"vm": ["cluster_type", ],
"vm": [
"cluster_type",
"role", "role",
"manufacturer", "manufacturer",
"region", "region",
@ -130,28 +153,32 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log
"tenant_group", "tenant_group",
"cluster", "cluster",
"device", "device",
"platform"] "platform",
,"cfs": {"dev": [], "vm": []} ],
"cfs": {"dev": [], "vm": []},
} }
for cf in device_cfs: for cf in device_cfs:
allowed_objects['cfs']['dev'].append(cf.name) allowed_objects["cfs"]["dev"].append(cf.name)
for cf in vm_cfs: for cf in vm_cfs:
allowed_objects['cfs']['vm'].append(cf.name) allowed_objects["cfs"]["vm"].append(cf.name)
hg_objects = [] hg_objects = []
if isinstance(hg_format,list): if isinstance(hg_format, list):
for f in hg_format: for f in hg_format:
hg_objects = hg_objects + f.split("/") hg_objects = hg_objects + f.split("/")
else: else:
hg_objects = hg_format.split("/") hg_objects = hg_format.split("/")
hg_objects = sorted(set(hg_objects)) hg_objects = sorted(set(hg_objects))
for hg_object in hg_objects: for hg_object in hg_objects:
if (hg_object not in allowed_objects[hg_type] and if (
hg_object not in allowed_objects['cfs'][hg_type]): hg_object not in allowed_objects[hg_type]
and hg_object not in allowed_objects["cfs"][hg_type]
and not hg_object.startswith(('"',"'"))
):
e = ( e = (
f"Hostgroup item {hg_object} is not valid. Make sure you" f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and separate them with '/'." " use valid items and separate them with '/'."
) )
logger.error(e) logger.warning(e)
raise HostgroupError(e) raise HostgroupError(e)
@ -159,7 +186,7 @@ def sanatize_log_output(data):
""" """
Used for the update function to Zabbix which Used for the update function to Zabbix which
shows the data that its using to update the host. shows the data that its using to update the host.
Removes and sensitive data from the input. Removes any sensitive data from the input.
""" """
if not isinstance(data, dict): if not isinstance(data, dict):
return data return data
@ -168,7 +195,7 @@ def sanatize_log_output(data):
if "macros" in data: if "macros" in data:
for macro in sanitized_data["macros"]: for macro in sanitized_data["macros"]:
# Check if macro is secret type # Check if macro is secret type
if not macro["type"] == str(1): if not (macro["type"] == str(1) or macro["type"] == 1):
continue continue
macro["value"] = "********" macro["value"] = "********"
# Check for interface data # Check for interface data

View File

@ -3,10 +3,11 @@
""" """
All of the Zabbix Usermacro related configuration All of the Zabbix Usermacro related configuration
""" """
from logging import getLogger from logging import getLogger
from re import match from re import match
from modules.tools import field_mapper from modules.tools import field_mapper, sanatize_log_output
class ZabbixUsermacros: class ZabbixUsermacros:
@ -57,8 +58,11 @@ class ZabbixUsermacros:
macro["macro"] = str(macro_name) macro["macro"] = str(macro_name)
if isinstance(macro_properties, dict): if isinstance(macro_properties, dict):
if not "value" in macro_properties: if not "value" in macro_properties:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} has " self.logger.info(
"no value in Netbox, skipping.") "Host %s: Usermacro %s has no value in Netbox, skipping.",
self.name,
macro_name,
)
return False return False
macro["value"] = macro_properties["value"] macro["value"] = macro_properties["value"]
@ -83,12 +87,17 @@ class ZabbixUsermacros:
macro["description"] = "" macro["description"] = ""
else: else:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} " self.logger.info(
"has no value, skipping.") "Host %s: Usermacro %s has no value, skipping.",
self.name,
macro_name,
)
return False return False
else: else:
self.logger.error( self.logger.warning(
f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping." "Host %s: Usermacro %s is not a valid usermacro name, skipping.",
self.name,
macro_name,
) )
return False return False
return macro return macro
@ -98,9 +107,10 @@ class ZabbixUsermacros:
Generate full set of Usermacros Generate full set of Usermacros
""" """
macros = [] macros = []
data = {}
# Parse the field mapper for usermacros # Parse the field mapper for usermacros
if self.usermacro_map: if self.usermacro_map:
self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper") self.logger.debug("Host %s: Starting usermacro mapper.", self.nb.name)
field_macros = field_mapper( field_macros = field_mapper(
self.nb.name, self.usermacro_map, self.nb, self.logger self.nb.name, self.usermacro_map, self.nb, self.logger
) )
@ -119,4 +129,8 @@ class ZabbixUsermacros:
m = self.render_macro(macro, properties) m = self.render_macro(macro, properties)
if m: if m:
macros.append(m) macros.append(m)
data = {"macros": macros}
self.logger.debug(
"Host %s: Resolved macros: %s", self.name, sanatize_log_output(data)
)
return macros return macros

View File

@ -2,7 +2,6 @@
"""Module that hosts all functions for virtual machine processing""" """Module that hosts all functions for virtual machine processing"""
from modules.device import PhysicalDevice from modules.device import PhysicalDevice
from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError
from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface from modules.interface import ZabbixInterface
from modules.config import load_config from modules.config import load_config
# Load config # Load config
@ -16,6 +15,7 @@ class VirtualMachine(PhysicalDevice):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.hostgroup = None self.hostgroup = None
self.zbx_template_names = None self.zbx_template_names = None
self.hostgroup_type = "vm"
def _inventory_map(self): def _inventory_map(self):
"""use VM inventory maps""" """use VM inventory maps"""
@ -29,25 +29,6 @@ class VirtualMachine(PhysicalDevice):
"""use VM tag maps""" """use VM tag maps"""
return config["vm_tag_map"] return config["vm_tag_map"]
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device"""
# Create new Hostgroup instance
hg = Hostgroup(
"vm",
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format
if isinstance(hg_format, list):
self.hostgroups = [hg.generate(f) for f in hg_format]
else:
self.hostgroups.append(hg.generate(hg_format))
def set_vm_template(self): def set_vm_template(self):
"""Set Template for VMs. Overwrites default class """Set Template for VMs. Overwrites default class
to skip a lookup of custom fields.""" to skip a lookup of custom fields."""

View File

@ -2,6 +2,7 @@
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation # pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation
"""NetBox to Zabbix sync script.""" """NetBox to Zabbix sync script."""
import argparse import argparse
import logging import logging
import ssl import ssl
@ -67,30 +68,36 @@ def main(arguments):
try: try:
# Get NetBox version # Get NetBox version
nb_version = netbox.version nb_version = netbox.version
logger.debug(f"NetBox version is {nb_version}.") logger.debug("NetBox version is %s.", nb_version)
except RequestsConnectionError: except RequestsConnectionError:
logger.error( logger.error(
f"Unable to connect to NetBox with URL {netbox_host}." "Unable to connect to NetBox with URL %s. Please check the URL and status of NetBox.",
" Please check the URL and status of NetBox." netbox_host,
) )
sys.exit(1) sys.exit(1)
except NBRequestError as e: except NBRequestError as e:
logger.error(f"NetBox error: {e}") logger.error("NetBox error: %s", e)
sys.exit(1) sys.exit(1)
# Check if the provided Hostgroup layout is valid # Check if the provided Hostgroup layout is valid
device_cfs = [] device_cfs = []
vm_cfs = [] vm_cfs = []
device_cfs = list( device_cfs = list(
netbox.extras.custom_fields.filter(type="text", content_types="dcim.device") netbox.extras.custom_fields.filter(type=["text","object","select"],
content_types="dcim.device")
)
verify_hg_format(
config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger
) )
verify_hg_format(config["hostgroup_format"],
device_cfs=device_cfs, hg_type="dev", logger=logger)
if config["sync_vms"]: if config["sync_vms"]:
vm_cfs = list( vm_cfs = list(
netbox.extras.custom_fields.filter(type="text", netbox.extras.custom_fields.filter(
content_types="virtualization.virtualmachine") type=["text","object","select"],
content_types="virtualization.virtualmachine"
)
)
verify_hg_format(
config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger
) )
verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger)
# Set Zabbix API # Set Zabbix API
try: try:
ssl_ctx = ssl.create_default_context() ssl_ctx = ssl.create_default_context()
@ -120,7 +127,8 @@ def main(arguments):
netbox_vms = [] netbox_vms = []
if config["sync_vms"]: if config["sync_vms"]:
netbox_vms = list( netbox_vms = list(
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])) netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])
)
netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all())) netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all()))
netbox_regions = convert_recordset(netbox.dcim.regions.all()) netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries netbox_journals = netbox.extras.journal_entries
@ -141,18 +149,28 @@ def main(arguments):
# Go through all NetBox devices # Go through all NetBox devices
for nb_vm in netbox_vms: for nb_vm in netbox_vms:
try: try:
vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version, vm = VirtualMachine(
config["create_journal"], logger) nb_vm,
logger.debug(f"Host {vm.name}: started operations on VM.") zabbix,
netbox_journals,
nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on VM.", vm.name)
vm.set_vm_template() vm.set_vm_template()
# Check if a valid template has been found for this VM. # Check if a valid template has been found for this VM.
if not vm.zbx_template_names: if not vm.zbx_template_names:
continue continue
vm.set_hostgroup(config["vm_hostgroup_format"], vm.set_hostgroup(
netbox_site_groups, netbox_regions) config["vm_hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM. # Check if a valid hostgroup has been found for this VM.
if not vm.hostgroups: if not vm.hostgroups:
continue continue
if config["extended_site_properties"] and nb_vm.site:
logger.debug("VM %s: extending site information.", vm.name)
vm.site=(convert_recordset(netbox.dcim.sites.filter(id=nb_vm.site.id)))
vm.set_inventory(nb_vm) vm.set_inventory(nb_vm)
vm.set_usermacros() vm.set_usermacros()
vm.set_tags() vm.set_tags()
@ -162,13 +180,12 @@ def main(arguments):
# Delete device from Zabbix # Delete device from Zabbix
# and remove hostID from NetBox. # and remove hostID from NetBox.
vm.cleanup() vm.cleanup()
logger.debug(f"VM {vm.name}: cleanup complete") logger.info("VM %s: cleanup complete", vm.name)
continue continue
# Device has been added to NetBox # Device has been added to NetBox
# but is not in Activate state # but is not in Activate state
logger.info( logger.info(
f"VM {vm.name}: skipping since this VM is " "VM %s: Skipping since this VM is not in the active state.", vm.name
f"not in the active state."
) )
continue continue
# Check if the VM is in the disabled state # Check if the VM is in the disabled state
@ -200,19 +217,35 @@ def main(arguments):
for nb_device in netbox_devices: for nb_device in netbox_devices:
try: try:
# Set device instance set data such as hostgroup and template information. # Set device instance set data such as hostgroup and template information.
device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version, device = PhysicalDevice(
config["create_journal"], logger) nb_device,
logger.debug(f"Host {device.name}: started operations on device.") zabbix,
device.set_template(config["templates_config_context"], netbox_journals,
config["templates_config_context_overrule"]) nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on device.", device.name)
device.set_template(
config["templates_config_context"],
config["templates_config_context_overrule"],
)
# Check if a valid template has been found for this VM. # Check if a valid template has been found for this VM.
if not device.zbx_template_names: if not device.zbx_template_names:
continue continue
device.set_hostgroup( device.set_hostgroup(
config["hostgroup_format"], netbox_site_groups, netbox_regions) config["hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM. # Check if a valid hostgroup has been found for this VM.
if not device.hostgroups: if not device.hostgroups:
logger.warning(
"Host %s: Host has no valid hostgroups, Skipping this host...",
device.name,
)
continue continue
if config["extended_site_properties"] and nb_device.site:
logger.debug("Device %s: extending site information.", device.name)
device.site=(convert_recordset(netbox.dcim.sites.filter(id=nb_device.site.id)))
device.set_inventory(nb_device) device.set_inventory(nb_device)
device.set_usermacros() device.set_usermacros()
device.set_tags() device.set_tags()
@ -221,16 +254,16 @@ def main(arguments):
if device.isCluster() and config["clustering"]: if device.isCluster() and config["clustering"]:
# Check if device is primary or secondary # Check if device is primary or secondary
if device.promoteMasterDevice(): if device.promoteMasterDevice():
e = f"Device {device.name}: is " f"part of cluster and primary." logger.info(
logger.info(e) "Device %s: is part of cluster and primary.", device.name
)
else: else:
# Device is secondary in cluster. # Device is secondary in cluster.
# Don't continue with this device. # Don't continue with this device.
e = ( logger.info(
f"Device {device.name}: is part of cluster " "Device %s: Is part of cluster but not primary. Skipping this host...",
f"but not primary. Skipping this host..." device.name,
) )
logger.info(e)
continue continue
# Checks if device is in cleanup state # Checks if device is in cleanup state
if device.status in config["zabbix_device_removal"]: if device.status in config["zabbix_device_removal"]:
@ -238,13 +271,13 @@ def main(arguments):
# Delete device from Zabbix # Delete device from Zabbix
# and remove hostID from NetBox. # and remove hostID from NetBox.
device.cleanup() device.cleanup()
logger.info(f"Device {device.name}: cleanup complete") logger.info("Device %s: cleanup complete", device.name)
continue continue
# Device has been added to NetBox # Device has been added to NetBox
# but is not in Activate state # but is not in Activate state
logger.info( logger.info(
f"Device {device.name}: skipping since this device is " "Device %s: Skipping since this device is not in the active state.",
f"not in the active state." device.name,
) )
continue continue
# Check if the device is in the disabled state # Check if the device is in the disabled state
@ -280,7 +313,7 @@ if __name__ == "__main__":
description="A script to sync Zabbix with NetBox device data." description="A script to sync Zabbix with NetBox device data."
) )
parser.add_argument( parser.add_argument(
"-v", "--verbose", help="Turn on debugging.", action="store_true" "-v", "--verbose", help="Turn on verbose logging.", action="store_true"
) )
parser.add_argument( parser.add_argument(
"-vv", "--debug", help="Turn on debugging.", action="store_true" "-vv", "--debug", help="Turn on debugging.", action="store_true"

View File

@ -163,10 +163,6 @@ class TestHostgroups(unittest.TestCase):
"""Test different hostgroup formats for devices.""" """Test different hostgroup formats for devices."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger) hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Default format: site/manufacturer/role
default_result = hostgroup.generate()
self.assertEqual(default_result, "TestSite/TestManufacturer/TestRole")
# Custom format: site/region # Custom format: site/region
custom_result = hostgroup.generate("site/region") custom_result = hostgroup.generate("site/region")
self.assertEqual(custom_result, "TestSite/TestRegion") self.assertEqual(custom_result, "TestSite/TestRegion")
@ -180,7 +176,7 @@ class TestHostgroups(unittest.TestCase):
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger) hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Default format: cluster/role # Default format: cluster/role
default_result = hostgroup.generate() default_result = hostgroup.generate("cluster/role")
self.assertEqual(default_result, "TestCluster/TestRole") self.assertEqual(default_result, "TestCluster/TestRole")
# Custom format: site/tenant # Custom format: site/tenant
@ -251,7 +247,7 @@ class TestHostgroups(unittest.TestCase):
hostgroup = Hostgroup("dev", minimal_device, "4.0", self.mock_logger) hostgroup = Hostgroup("dev", minimal_device, "4.0", self.mock_logger)
# Generate with default format # Generate with default format
result = hostgroup.generate() result = hostgroup.generate("site/manufacturer/role")
# Site is missing, so only manufacturer and role should be included # Site is missing, so only manufacturer and role should be included
self.assertEqual(result, "MinimalManufacturer/MinimalRole") self.assertEqual(result, "MinimalManufacturer/MinimalRole")
@ -259,24 +255,6 @@ class TestHostgroups(unittest.TestCase):
with self.assertRaises(HostgroupError): with self.assertRaises(HostgroupError):
hostgroup.generate("site/nonexistent/role") hostgroup.generate("site/nonexistent/role")
def test_hostgroup_missing_required_attributes(self):
"""Test handling when no valid hostgroup can be generated."""
# Create a VM with minimal attributes that won't satisfy any format
minimal_vm = MagicMock()
minimal_vm.name = "minimal-vm"
minimal_vm.site = None
minimal_vm.tenant = None
minimal_vm.platform = None
minimal_vm.role = None
minimal_vm.cluster = None
minimal_vm.custom_fields = {}
hostgroup = Hostgroup("vm", minimal_vm, "4.0", self.mock_logger)
# With default format of cluster/role, both are None, so should raise an error
with self.assertRaises(HostgroupError):
hostgroup.generate()
def test_nested_region_hostgroups(self): def test_nested_region_hostgroups(self):
"""Test hostgroup generation with nested regions.""" """Test hostgroup generation with nested regions."""
# Mock the build_path function to return a predictable result # Mock the build_path function to return a predictable result
@ -335,6 +313,60 @@ class TestHostgroups(unittest.TestCase):
call.write('\n')] call.write('\n')]
mock_stdout.assert_has_calls(calls, any_order=True) mock_stdout.assert_has_calls(calls, any_order=True)
def test_vm_list_based_hostgroup_format(self):
"""Test VM hostgroup generation with a list-based format."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Test with a list of format strings
format_list = ["platform", "role", "cluster_type/cluster"]
# Generate hostgroups for each format in the list
hostgroups = []
for fmt in format_list:
result = hostgroup.generate(fmt)
if result: # Only add non-None results
hostgroups.append(result)
# Verify each expected hostgroup is generated
self.assertEqual(len(hostgroups), 3) # Should have 3 hostgroups
self.assertIn("TestPlatform", hostgroups)
self.assertIn("TestRole", hostgroups)
self.assertIn("TestClusterType/TestCluster", hostgroups)
def test_nested_format_splitting(self):
"""Test that formats with slashes correctly split and resolve each component."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Test a format with slashes that should be split
complex_format = "cluster_type/cluster"
result = hostgroup.generate(complex_format)
# Verify the format is correctly split and each component resolved
self.assertEqual(result, "TestClusterType/TestCluster")
def test_multiple_hostgroup_formats_device(self):
"""Test device hostgroup generation with multiple formats."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Test with various formats that would be in a list
formats = [
"site",
"manufacturer/role",
"platform/location",
"tenant_group/tenant"
]
# Generate and check each format
results = {}
for fmt in formats:
results[fmt] = hostgroup.generate(fmt)
# Verify results
self.assertEqual(results["site"], "TestSite")
self.assertEqual(results["manufacturer/role"], "TestManufacturer/TestRole")
self.assertEqual(results["platform/location"], "TestPlatform/TestLocation")
self.assertEqual(results["tenant_group/tenant"], "TestTenantGroup/TestTenant")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -0,0 +1,137 @@
"""Tests for list-based hostgroup formats in configuration."""
import unittest
from unittest.mock import MagicMock, patch
from modules.hostgroups import Hostgroup
from modules.exceptions import HostgroupError
from modules.tools import verify_hg_format
class TestListHostgroupFormats(unittest.TestCase):
"""Test class for list-based hostgroup format functionality."""
def setUp(self):
"""Set up test fixtures."""
# Create mock logger
self.mock_logger = MagicMock()
# Create mock device
self.mock_device = MagicMock()
self.mock_device.name = "test-device"
# Set up site information
site = MagicMock()
site.name = "TestSite"
# Set up region information
region = MagicMock()
region.name = "TestRegion"
region.__str__.return_value = "TestRegion"
site.region = region
# Set device site
self.mock_device.site = site
# Set up role information
self.mock_device_role = MagicMock()
self.mock_device_role.name = "TestRole"
self.mock_device_role.__str__.return_value = "TestRole"
self.mock_device.role = self.mock_device_role
# Set up rack information
rack = MagicMock()
rack.name = "TestRack"
self.mock_device.rack = rack
# Set up platform information
platform = MagicMock()
platform.name = "TestPlatform"
self.mock_device.platform = platform
# Device-specific properties
device_type = MagicMock()
manufacturer = MagicMock()
manufacturer.name = "TestManufacturer"
device_type.manufacturer = manufacturer
self.mock_device.device_type = device_type
# Create mock VM
self.mock_vm = MagicMock()
self.mock_vm.name = "test-vm"
# Reuse site from device
self.mock_vm.site = site
# Set up role for VM
self.mock_vm.role = self.mock_device_role
# Set up platform for VM
self.mock_vm.platform = platform
# VM-specific properties
cluster = MagicMock()
cluster.name = "TestCluster"
cluster_type = MagicMock()
cluster_type.name = "TestClusterType"
cluster.type = cluster_type
self.mock_vm.cluster = cluster
def test_verify_list_based_hostgroup_format(self):
"""Test verification of list-based hostgroup formats."""
# List format with valid items
valid_format = ["region", "site", "rack"]
# List format with nested path
valid_nested_format = ["region", "site/rack"]
# List format with invalid item
invalid_format = ["region", "invalid_item", "rack"]
# Should not raise exception for valid formats
verify_hg_format(valid_format, hg_type="dev", logger=self.mock_logger)
verify_hg_format(valid_nested_format, hg_type="dev", logger=self.mock_logger)
# Should raise exception for invalid format
with self.assertRaises(HostgroupError):
verify_hg_format(invalid_format, hg_type="dev", logger=self.mock_logger)
def test_simulate_hostgroup_generation_from_config(self):
"""Simulate how the main script would generate hostgroups from list-based config."""
# Mock configuration with list-based hostgroup format
config_format = ["region", "site", "rack"]
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Simulate the main script's hostgroup generation process
hostgroups = []
for fmt in config_format:
result = hostgroup.generate(fmt)
if result:
hostgroups.append(result)
# Check results
self.assertEqual(len(hostgroups), 3)
self.assertIn("TestRegion", hostgroups)
self.assertIn("TestSite", hostgroups)
self.assertIn("TestRack", hostgroups)
def test_vm_hostgroup_format_from_config(self):
"""Test VM hostgroup generation with list-based format."""
# Mock VM configuration with mixed format
config_format = ["platform", "role", "cluster_type/cluster"]
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Simulate the main script's hostgroup generation process
hostgroups = []
for fmt in config_format:
result = hostgroup.generate(fmt)
if result:
hostgroups.append(result)
# Check results
self.assertEqual(len(hostgroups), 3)
self.assertIn("TestPlatform", hostgroups)
self.assertIn("TestRole", hostgroups)
self.assertIn("TestClusterType/TestCluster", hostgroups)
if __name__ == "__main__":
unittest.main()

View File

@ -88,7 +88,7 @@ class TestZabbixUsermacros(unittest.TestCase):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("{$FOO}", {"type": "text"}) result = macros.render_macro("{$FOO}", {"type": "text"})
self.assertFalse(result) self.assertFalse(result)
self.logger.warning.assert_called() self.logger.info.assert_called()
def test_render_macro_str(self): def test_render_macro_str(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
@ -102,7 +102,7 @@ class TestZabbixUsermacros(unittest.TestCase):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger) macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("FOO", "bar") result = macros.render_macro("FOO", "bar")
self.assertFalse(result) self.assertFalse(result)
self.logger.error.assert_called() self.logger.warning.assert_called()
def test_generate_from_map(self): def test_generate_from_map(self):
nb = DummyNB(memory="bar", role="baz") nb = DummyNB(memory="bar", role="baz")