40 Commits
3.1 ... 3.2.0

Author SHA1 Message Date
Twan Kamans
2fa05ffe92 Merge pull request #146 from TheNetworkGuy/develop
Some checks failed
Build and Push Docker Image / test_quality (push) Has been cancelled
Build and Push Docker Image / test_code (push) Has been cancelled
Build and Push Docker Image / build (push) Has been cancelled
Support for 7.4
2025-09-24 15:10:50 +02:00
TheNetworkGuy
b81d4abfcd Add support for Zabbix 7.4 2025-09-23 12:47:05 +02:00
Raymond Kuiper
c58a3e8dd5 Update README.md
Replaced dependency pyzabbix with zabbix-utils as this was changed a few months ago.
2025-06-26 09:48:25 +02:00
Raymond Kuiper
3e1657e575 Merge pull request #140 from retigra/hostgroup_static_text
 Hostgroup static text
2025-06-25 17:21:31 +02:00
Raymond Kuiper
161b310ba3 corrected linting error 2025-06-25 17:07:46 +02:00
Raymond Kuiper
cf2c841d23 Merge branch 'develop' into hostgroup_static_text 2025-06-25 17:06:37 +02:00
Raymond Kuiper
b258b02b91 Merge pull request #138 from retigra/issue-136
 Logging improvements
2025-06-25 17:00:58 +02:00
Raymond Kuiper
e82c098e26 corrected linting error 2025-06-25 17:00:04 +02:00
Raymond Kuiper
3910e0de2d Updated docs 2025-06-25 16:54:12 +02:00
Raymond Kuiper
98c13919c5 Added support for hardcoded strings in hostgroups 2025-06-25 16:50:17 +02:00
Wouter de Bruijn
e718560689 🚨 Line length fixes 2025-06-25 16:37:44 +02:00
Wouter de Bruijn
57c7f83e6a 🔊 Removed f-strings usage from logs 2025-06-25 13:56:41 +02:00
Raymond Kuiper
e0ec3c0632 updated usermacro test for new loglevels 2025-06-25 10:54:39 +02:00
Raymond Kuiper
e4a1a17ded Logging improvements 2025-06-25 10:43:47 +02:00
Twan Kamans
f15e53185b Merge pull request #137 from TheNetworkGuy/hostgroup_fixes2
Fixes bug for hostgroups and removed default values for hostgroups
2025-06-24 21:44:34 +02:00
TheNetworkGuy
5923682d48 Fixes workflows to be executed 2 times. 2025-06-24 21:42:46 +02:00
TheNetworkGuy
29a54e5a86 Removed unused hostgroup import since the hostgroup generate function function has been moved to devices.py 2025-06-24 21:29:36 +02:00
TheNetworkGuy
4a53b53789 Removed previous patch for Nonetype hostgroups and made a proper fix by refactoring the set_hostgroup() function and removing it from virtual_machines.py 2025-06-24 21:28:32 +02:00
TheNetworkGuy
6d4f1ac0a5 Added hostgroup tests 2025-06-24 21:28:13 +02:00
TheNetworkGuy
a522c98929 Removed default None for hg_format making a hostgroup format input required. 2025-06-24 20:50:04 +02:00
TheNetworkGuy
1de0b0781b Removed default for hostgroups and fixed bug for hostgroup attributes which do not exist 2025-06-24 20:44:59 +02:00
Raymond Kuiper
1cf24fbcb5 Merge pull request #135 from retigra/issue-131
🐛 Fixes for issue #131
2025-06-24 17:52:13 +02:00
Raymond Kuiper
c2b25e0cd2 fixed linting 2025-06-24 17:35:10 +02:00
Raymond Kuiper
9933c97e94 improved debug logging 2025-06-24 17:28:57 +02:00
Raymond Kuiper
435fd1fa78 Fixed issues with tag mapping 2025-06-24 17:09:23 +02:00
Raymond Kuiper
099ebcace5 Merge pull request #134 from retigra/issue-130
🐛 Fixes for issue #130
2025-06-24 16:02:36 +02:00
Raymond Kuiper
906c719863 corrected linting errors 2025-06-24 15:16:39 +02:00
Raymond Kuiper
2a3d586302 corrected typo 2025-06-24 15:06:52 +02:00
Raymond Kuiper
753633e7d2 Added checks for empty list of hostgroups, improved some logging 2025-06-24 15:01:45 +02:00
Raymond Kuiper
de82d5ac71 Remove duplicates from the list of hostgroups 2025-06-24 13:52:43 +02:00
Raymond Kuiper
9912f24450 Merge pull request #3 from TheNetworkGuy/main
Sync with upstream
2025-06-24 11:58:31 +02:00
Twan Kamans
d056a20de2 Merge pull request #128 from TheNetworkGuy/develop
Fixes #127, implements some tests to prevent hostgroup failures.
2025-06-17 09:06:30 +02:00
TheNetworkGuy
a57b51870f Merge branch 'develop' of github.com:TheNetworkGuy/netbox-zabbix-sync into develop 2025-06-17 08:47:49 +02:00
TheNetworkGuy
dbc7acaf98 Added hostgroup tests, set the test coverage to 70%, added test packages to devcontainer 2025-06-16 18:40:06 +00:00
TheNetworkGuy
87b33706c0 Updated README with cluster_type 2025-06-16 16:07:38 +00:00
TheNetworkGuy
affd4c6998 Fixes #127 2025-06-16 16:03:53 +00:00
Twan Kamans
22982c3607 Merge pull request #126 from TheNetworkGuy/develop
Fixes bug in which config.py was not detected by the script
2025-06-16 17:21:03 +02:00
TheNetworkGuy
dec2cf6996 Fixed bug in which custom config.py module was not accessed 2025-06-16 14:04:10 +00:00
TheNetworkGuy
940f2d6afb Re-added some git logic to the pipeline which was lost during development 2025-06-16 11:13:36 +00:00
TheNetworkGuy
d79f96a5b4 Add unittests to build process 2025-06-16 10:03:58 +00:00
17 changed files with 874 additions and 265 deletions

View File

@@ -12,7 +12,7 @@
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "pip3 install --user -r requirements.txt && pip3 install --user pylint pytest"
"postCreateCommand": "pip3 install --user -r requirements.txt && pip3 install --user pylint pytest coverage pytest-cov"
// Configure tool-specific properties.
// "customizations": {},

View File

@@ -5,10 +5,16 @@ on:
push:
branches:
- main
release:
types: [published]
pull_request:
types: [opened, synchronize]
jobs:
test_quality:
uses: ./.github/workflows/quality.yml
uses: ./.github/workflows/quality.yml
test_code:
uses: ./.github/workflows/run_tests.yml
build:
runs-on: ubuntu-latest
steps:

View File

@@ -2,7 +2,6 @@
name: Pylint Quality control
on:
push:
pull_request:
workflow_call:

View File

@@ -2,8 +2,8 @@
name: Pytest code testing
on:
push:
pull_request:
workflow_call:
jobs:
test_code:
@@ -29,4 +29,4 @@ jobs:
coverage run -m pytest tests
- name: Check coverage percentage
run: |
coverage report --fail-under=60
coverage report --fail-under=70

View File

@@ -51,7 +51,7 @@ pip.
```sh
# Packages:
pynetbox
pyzabbix
zabbix-utils
# Install them through requirements.txt from a venv:
virtualenv .venv
@@ -185,10 +185,11 @@ used:
**Only for VMs**
| name | description |
| ------------ | --------------- |
| cluster | VM cluster name |
| device | parent device |
| name | description |
| ------------ | --------------- |
| cluster | VM cluster name |
| cluster_type | VM cluster type |
| device | parent device |
You can specify the value separated by a "/" like so:
@@ -211,6 +212,17 @@ in `config.py` the script will render a full region path of all parent regions
for the hostgroup name. `traverse_site_groups` controls the same behaviour for
site_groups.
**Hardcoded text**
You can add hardcoded text in the hostgroup format by using quotes, this will
insert the literal text:
```python
hostgroup_format = "'MyDevices'/location/role"
```
In this case, the prefix MyDevices will be used for all generated groups.
**Custom fields**
You can use the value of custom fields for hostgroup generation. This allows

View File

@@ -3,7 +3,7 @@ Module for parsing configuration from the top level config.py file
"""
from pathlib import Path
from importlib import util
from os import environ
from os import environ, path
from logging import getLogger
logger = getLogger(__name__)
@@ -104,18 +104,25 @@ def load_env_variable(config_environvar):
def load_config_file(config_default, config_file="config.py"):
"""Returns config from config.py file"""
# Check if config.py exists and load it
# If it does not exist, return the default config
config_path = Path(config_file)
if config_path.exists():
dconf = config_default.copy()
# Dynamically import the config module
spec = util.spec_from_file_location("config", config_path)
config_module = util.module_from_spec(spec)
spec.loader.exec_module(config_module)
# Update DEFAULT_CONFIG with variables from the config module
for key in dconf:
if hasattr(config_module, key):
dconf[key] = getattr(config_module, key)
return dconf
return config_default
# Find the script path and config file next to it.
script_dir = path.dirname(path.dirname(path.abspath(__file__)))
config_path = Path(path.join(script_dir, config_file))
# If the script directory is not found, try the current working directory
if not config_path.exists():
config_path = Path(config_file)
# If both checks fail then fallback to the default config
if not config_path.exists():
return config_default
dconf = config_default.copy()
# Dynamically import the config module
spec = util.spec_from_file_location("config", config_path)
config_module = util.module_from_spec(spec)
spec.loader.exec_module(config_module)
# Update DEFAULT_CONFIG with variables from the config module
for key in dconf:
if hasattr(config_module, key):
dconf[key] = getattr(config_module, key)
return dconf

View File

@@ -26,6 +26,7 @@ from modules.config import load_config
config = load_config()
class PhysicalDevice:
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments
"""
@@ -48,6 +49,7 @@ class PhysicalDevice:
self.zbx_template_names = []
self.zbx_templates = []
self.hostgroups = []
self.hostgroup_type = "dev"
self.tenant = nb.tenant
self.config_context = nb.config_context
self.zbxproxy = None
@@ -96,8 +98,8 @@ class PhysicalDevice:
if config["device_cf"] in self.nb.custom_fields:
self.zabbix_id = self.nb.custom_fields[config["device_cf"]]
else:
e = f'Host {self.name}: Custom field {config["device_cf"]} not present'
self.logger.warning(e)
e = f"Host {self.name}: Custom field {config['device_cf']} not present"
self.logger.error(e)
raise SyncInventoryError(e)
# Validate hostname format.
@@ -110,9 +112,11 @@ class PhysicalDevice:
self.visible_name = self.nb.name
self.use_visible_name = True
self.logger.info(
f"Host {self.visible_name} contains special characters. "
f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix."
"Host %s contains special characters."
"Using %s as name for the NetBox object and using %s as visible name in Zabbix.",
self.visible_name,
self.name,
self.visible_name,
)
else:
pass
@@ -121,12 +125,12 @@ class PhysicalDevice:
"""Set the hostgroup for this device"""
# Create new Hostgroup instance
hg = Hostgroup(
"dev",
self.hostgroup_type,
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config['traverse_site_groups'],
nested_region_flag=config['traverse_regions'],
nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
@@ -135,6 +139,14 @@ class PhysicalDevice:
self.hostgroups = [hg.generate(f) for f in hg_format]
else:
self.hostgroups.append(hg.generate(hg_format))
# Remove duplicates and None values
self.hostgroups = list(filter(None, list(set(self.hostgroups))))
if self.hostgroups:
self.logger.debug(
"Host %s: Should be member of groups: %s", self.name, self.hostgroups
)
return True
return False
def set_template(self, prefer_config_context, overrule_custom):
"""Set Template"""
@@ -177,8 +189,6 @@ class PhysicalDevice:
self.logger.warning(e)
raise TemplateError(e)
def get_templates_context(self):
"""Get Zabbix templates from the device context"""
if "zabbix" not in self.config_context:
@@ -203,9 +213,11 @@ class PhysicalDevice:
# Set inventory mode. Default is disabled (see class init function).
if config["inventory_mode"] == "disabled":
if config["inventory_sync"]:
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. "
"Inventory sync is enabled in "
"config but inventory mode is disabled.")
self.logger.error(
"Host %s: Unable to map NetBox inventory to Zabbix."
"Inventory sync is enabled in config but inventory mode is disabled",
self.name,
)
return True
if config["inventory_mode"] == "manual":
self.inventory_mode = 0
@@ -213,16 +225,20 @@ class PhysicalDevice:
self.inventory_mode = 1
else:
self.logger.error(
f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {config['inventory_mode']}"
"Host %s: Specified value for inventory mode in config is not valid. Got value %s",
self.name,
config["inventory_mode"],
)
return False
self.inventory = {}
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper")
self.logger.debug("Host %s: Starting inventory mapper.", self.name)
self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger
)
self.logger.debug(
"Host %s: Resolved inventory: %s", self.name, self.inventory
)
return True
def isCluster(self):
@@ -240,14 +256,14 @@ class PhysicalDevice:
f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster."
)
self.logger.warning(e)
self.logger.info(e)
raise SyncInventoryError(e)
if not self.nb.virtual_chassis.master:
e = (
f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason."
)
self.logger.error(e)
self.logger.warning(e)
raise SyncInventoryError(e)
return self.nb.virtual_chassis.master.id
@@ -259,14 +275,15 @@ class PhysicalDevice:
"""
masterid = self.getClusterMaster()
if masterid == self.id:
self.logger.debug(
f"Host {self.name} is primary cluster member. "
f"Modifying hostname from {self.name} to "
+ f"{self.nb.virtual_chassis.name}."
self.logger.info(
"Host %s is primary cluster member. Modifying hostname from %s to %s.",
self.name,
self.name,
self.nb.virtual_chassis.name,
)
self.name = self.nb.virtual_chassis.name
return True
self.logger.debug(f"Host {self.name} is non-primary cluster member.")
self.logger.info("Host %s is non-primary cluster member.", self.name)
return False
def zbxTemplatePrepper(self, templates):
@@ -278,7 +295,7 @@ class PhysicalDevice:
# Check if there are templates defined
if not self.zbx_template_names:
e = f"Host {self.name}: No templates found"
self.logger.info(e)
self.logger.warning(e)
raise SyncInventoryError()
# Set variable to empty list
self.zbx_templates = []
@@ -298,7 +315,10 @@ class PhysicalDevice:
"name": zbx_template["name"],
}
)
e = f"Host {self.name}: found template {zbx_template['name']}"
e = (
f"Host {self.name}: Found template '{zbx_template['name']}' "
f"(ID:{zbx_template['templateid']})"
)
self.logger.debug(e)
# Return error should the template not be found in Zabbix
if not template_match:
@@ -321,8 +341,8 @@ class PhysicalDevice:
if group["name"] == hg:
self.group_ids.append({"groupid": group["groupid"]})
e = (
f"Host {self.name}: matched group "
f"\"{group['name']}\" (ID:{group['groupid']})"
f"Host {self.name}: Matched group "
f'"{group["name"]}" (ID:{group["groupid"]})'
)
self.logger.debug(e)
if len(self.group_ids) == len(self.hostgroups):
@@ -403,7 +423,7 @@ class PhysicalDevice:
macros = ZabbixUsermacros(
self.nb,
self._usermacro_map(),
config['usermacro_sync'],
config["usermacro_sync"],
logger=self.logger,
host=self.name,
)
@@ -421,16 +441,16 @@ class PhysicalDevice:
tags = ZabbixTags(
self.nb,
self._tag_map(),
config['tag_sync'],
config['tag_lower'],
tag_name=config['tag_name'],
tag_value=config['tag_value'],
tag_sync=config["tag_sync"],
tag_lower=config["tag_lower"],
tag_name=config["tag_name"],
tag_value=config["tag_value"],
logger=self.logger,
host=self.name,
)
if tags.sync is False:
if config["tag_sync"] is False:
self.tags = []
return False
self.tags = tags.generate()
return True
@@ -468,12 +488,12 @@ class PhysicalDevice:
# If the proxy name matches
if proxy["name"] == proxy_name:
self.logger.debug(
f"Host {self.name}: using {proxy['type']}" f" {proxy_name}"
"Host %s: using {proxy['type']} '%s'", self.name, proxy_name
)
self.zbxproxy = proxy
return True
self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}"
"Host %s: unable to find proxy %s", self.name, proxy_name
)
return False
@@ -503,7 +523,6 @@ class PhysicalDevice:
templateids.append({"templateid": template["templateid"]})
# Set interface, group and template configuration
interfaces = self.setInterfaceDetails()
groups = self.group_ids
# Set Zabbix proxy if defined
self.setProxy(proxies)
# Set basic data for host creation
@@ -512,7 +531,7 @@ class PhysicalDevice:
"name": self.visible_name,
"status": self.zabbix_state,
"interfaces": interfaces,
"groups": groups,
"groups": self.group_ids,
"templates": templateids,
"description": description,
"inventory_mode": self.inventory_mode,
@@ -541,12 +560,12 @@ class PhysicalDevice:
# Set NetBox custom field to hostID value.
self.nb.custom_fields[config["device_cf"]] = int(self.zabbix_id)
self.nb.save()
msg = f"Host {self.name}: Created host in Zabbix."
msg = f"Host {self.name}: Created host in Zabbix. (ID:{self.zabbix_id})"
self.logger.info(msg)
self.create_journal_entry("success", msg)
else:
self.logger.error(
f"Host {self.name}: Unable to add to Zabbix. Host already present."
"Host %s: Unable to add to Zabbix. Host already present.", self.name
)
def createZabbixHostgroup(self, hostgroups):
@@ -604,7 +623,9 @@ class PhysicalDevice:
)
self.logger.error(e)
raise SyncExternalError(e) from None
self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.")
self.logger.info(
"Host %s: updated with data %s.", self.name, sanatize_log_output(kwargs)
)
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck(
@@ -615,7 +636,7 @@ class PhysicalDevice:
Checks if Zabbix object is still valid with NetBox parameters.
"""
# If group is found or if the hostgroup is nested
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if create_hostgroups:
# Script is allowed to create a new hostgroup
new_groups = self.createZabbixHostgroup(groups)
@@ -632,8 +653,6 @@ class PhysicalDevice:
)
self.logger.warning(e)
raise SyncInventoryError(e)
#if self.group_ids:
# self.group_ids.append(self.pri_group_id)
# Prepare templates and proxy config
self.zbxTemplatePrepper(templates)
@@ -666,28 +685,30 @@ class PhysicalDevice:
raise SyncInventoryError(e)
host = host[0]
if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: hostname in-sync.")
self.logger.debug("Host %s: Hostname in-sync.", self.name)
else:
self.logger.warning(
f"Host {self.name}: hostname OUT of sync. "
f"Received value: {host['host']}"
self.logger.info(
"Host %s: Hostname OUT of sync. Received value: %s",
self.name,
host["host"],
)
self.updateZabbixHost(host=self.name)
# Execute check depending on wether the name is special or not
if self.use_visible_name:
if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: visible name in-sync.")
self.logger.debug("Host %s: Visible name in-sync.", self.name)
else:
self.logger.warning(
f"Host {self.name}: visible name OUT of sync."
f" Received value: {host['name']}"
self.logger.info(
"Host %s: Visible name OUT of sync. Received value: %s",
self.name,
host["name"],
)
self.updateZabbixHost(name=self.visible_name)
# Check if the templates are in-sync
if not self.zbx_template_comparer(host["parentTemplates"]):
self.logger.warning(f"Host {self.name}: template(s) OUT of sync.")
self.logger.info("Host %s: Template(s) OUT of sync.", self.name)
# Prepare Templates for API parsing
templateids = []
for template in self.zbx_templates:
@@ -697,38 +718,41 @@ class PhysicalDevice:
templates_clear=host["parentTemplates"], templates=templateids
)
else:
self.logger.debug(f"Host {self.name}: template(s) in-sync.")
self.logger.debug("Host %s: Template(s) in-sync.", self.name)
# Check if Zabbix version is 6 or higher. Issue #93
group_dictname = "hostgroups"
if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups"
# Check if hostgroups match
if (sorted(host[group_dictname], key=itemgetter('groupid')) ==
sorted(self.group_ids, key=itemgetter('groupid'))):
self.logger.debug(f"Host {self.name}: hostgroups in-sync.")
if sorted(host[group_dictname], key=itemgetter("groupid")) == sorted(
self.group_ids, key=itemgetter("groupid")
):
self.logger.debug("Host %s: Hostgroups in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: hostgroups OUT of sync.")
self.logger.info("Host %s: Hostgroups OUT of sync.", self.name)
self.updateZabbixHost(groups=self.group_ids)
if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: status in-sync.")
self.logger.debug("Host %s: Status in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: status OUT of sync.")
self.logger.info("Host %s: Status OUT of sync.", self.name)
self.updateZabbixHost(status=str(self.zabbix_state))
# Check if a proxy has been defined
if self.zbxproxy:
# Check if proxy or proxy group is defined
if (self.zbxproxy["idtype"] in host and
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]):
self.logger.debug(f"Host {self.name}: proxy in-sync.")
if (
self.zbxproxy["idtype"] in host
and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]
):
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
self.logger.debug(f"Host {self.name}: proxy in-sync.")
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Proxy does not match, update Zabbix
else:
self.logger.warning(f"Host {self.name}: proxy OUT of sync.")
self.logger.info("Host %s: Proxy OUT of sync.", self.name)
# Zabbix <= 6 patch
if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
@@ -751,8 +775,9 @@ class PhysicalDevice:
if proxy_power and proxy_set:
# Zabbix <= 6 fix
self.logger.warning(
f"Host {self.name}: no proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix"
"Host %s: No proxy is configured in NetBox but is configured in Zabbix."
"Removing proxy config in Zabbix",
self.name,
)
if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0)
@@ -765,60 +790,61 @@ class PhysicalDevice:
# Checks if a proxy has been defined in Zabbix and if proxy_power config has been set
if proxy_set and not proxy_power:
# Display error message
self.logger.error(
f"Host {self.name} is configured "
f"with proxy in Zabbix but not in NetBox. The"
" -p flag was ommited: no "
"changes have been made."
self.logger.warning(
"Host %s: Is configured with proxy in Zabbix but not in NetBox."
"The -p flag was ommited: no changes have been made.",
self.name,
)
if not proxy_set:
self.logger.debug(f"Host {self.name}: proxy in-sync.")
self.logger.debug("Host %s: Proxy in-sync.", self.name)
# Check host inventory mode
if str(host["inventory_mode"]) == str(self.inventory_mode):
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.")
self.logger.debug("Host %s: inventory_mode in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.")
self.logger.info("Host %s: inventory_mode OUT of sync.", self.name)
self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
# Check host inventory mapping
if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: inventory in-sync.")
self.logger.debug("Host %s: Inventory in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: inventory OUT of sync.")
self.logger.info("Host %s: Inventory OUT of sync.", self.name)
self.updateZabbixHost(inventory=self.inventory)
# Check host usermacros
if config['usermacro_sync']:
if config["usermacro_sync"]:
# Make a full copy synce we dont want to lose the original value
# of secret type macros from Netbox
netbox_macros = deepcopy(self.usermacros)
# Set the sync bit
full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full")
full_sync_bit = bool(str(config["usermacro_sync"]).lower() == "full")
for macro in netbox_macros:
# If the Macro is a secret and full sync is NOT activated
if macro["type"] == str(1) and not full_sync_bit:
# Remove the value as the Zabbix api does not return the value key
# This is required when you want to do a diff between both lists
macro.pop("value")
# Sort all lists
def filter_with_macros(macro):
return macro["macro"]
host["macros"].sort(key=filter_with_macros)
netbox_macros.sort(key=filter_with_macros)
# Check if both lists are the same
if host["macros"] == netbox_macros:
self.logger.debug(f"Host {self.name}: usermacros in-sync.")
self.logger.debug("Host %s: Usermacros in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: usermacros OUT of sync.")
self.logger.info("Host %s: Usermacros OUT of sync.", self.name)
# Update Zabbix with NetBox usermacros
self.updateZabbixHost(macros=self.usermacros)
# Check host tags
if config['tag_sync']:
if config["tag_sync"]:
if remove_duplicates(host["tags"], sortkey="tag") == self.tags:
self.logger.debug(f"Host {self.name}: tags in-sync.")
self.logger.debug("Host %s: Tags in-sync.", self.name)
else:
self.logger.warning(f"Host {self.name}: tags OUT of sync.")
self.logger.info("Host %s: Tags OUT of sync.", self.name)
self.updateZabbixHost(tags=self.tags)
# If only 1 interface has been found
@@ -856,11 +882,11 @@ class PhysicalDevice:
updates[key] = item
if updates:
# If interface updates have been found: push to Zabbix
self.logger.warning(f"Host {self.name}: Interface OUT of sync.")
self.logger.info("Host %s: Interface OUT of sync.", self.name)
if "type" in updates:
# Changing interface type not supported. Raise exception.
e = (
f"Host {self.name}: changing interface type to "
f"Host {self.name}: Changing interface type to "
f"{str(updates['type'])} is not supported."
)
self.logger.error(e)
@@ -870,26 +896,27 @@ class PhysicalDevice:
try:
# API call to Zabbix
self.zabbix.hostinterface.update(updates)
e = (f"Host {self.name}: updated interface "
f"with data {sanatize_log_output(updates)}.")
self.logger.info(e)
self.create_journal_entry("info", e)
err_msg = (
f"Host {self.name}: Updated interface "
f"with data {sanatize_log_output(updates)}."
)
self.logger.info(err_msg)
self.create_journal_entry("info", err_msg)
except APIRequestError as e:
msg = f"Zabbix returned the following error: {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
else:
# If no updates are found, Zabbix interface is in-sync
e = f"Host {self.name}: interface in-sync."
self.logger.debug(e)
self.logger.debug("Host %s: Interface in-sync.", self.name)
else:
e = (
f"Host {self.name} has unsupported interface configuration."
err_msg = (
f"Host {self.name}: Has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual intervention required."
)
self.logger.error(e)
raise SyncInventoryError(e)
self.logger.error(err_msg)
raise SyncInventoryError(err_msg)
def create_journal_entry(self, severity, message):
"""
@@ -900,7 +927,7 @@ class PhysicalDevice:
# Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning(
f"Value {severity} not valid for NB journal entries."
"Value %s not valid for NB journal entries.", severity
)
return False
journal = {
@@ -911,12 +938,13 @@ class PhysicalDevice:
}
try:
self.nb_journals.create(journal)
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox")
self.logger.debug("Host %s: Created journal entry in NetBox", self.name)
return True
except NetboxRequestError as e:
self.logger.warning(
"Unable to create journal entry for "
f"{self.name}: NB returned {e}"
"Unable to create journal entry for %s: NB returned %s",
self.name,
e,
)
return False
return False
@@ -941,8 +969,9 @@ class PhysicalDevice:
tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl)
self.logger.debug(
f"Host {self.name}: template "
f"{nb_tmpl['name']} is present in Zabbix."
"Host %s: Template '%s' is present in Zabbix.",
self.name,
nb_tmpl["name"],
)
break
if (

View File

@@ -11,6 +11,7 @@ class Hostgroup:
Takes type (vm or dev) and NB object"""
# pylint: disable=too-many-arguments, disable=too-many-positional-arguments
# pylint: disable=logging-fstring-interpolation
def __init__(
self,
obj_type,
@@ -85,6 +86,7 @@ class Hostgroup:
format_options["location"] = (
str(self.nb.location) if self.nb.location else None
)
format_options["rack"] = self.nb.rack.name if self.nb.rack else None
# Variables only applicable for VM's
if self.type == "vm":
# Check if a cluster is configured. Could also be configured in a site.
@@ -92,6 +94,11 @@ class Hostgroup:
format_options["cluster"] = self.nb.cluster.name
format_options["cluster_type"] = self.nb.cluster.type.name
self.format_options = format_options
self.logger.debug(
"Host %s: Resolved properties for use in hostgroups: %s",
self.name,
self.format_options,
)
def set_nesting(
self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
@@ -102,49 +109,51 @@ class Hostgroup:
"region": {"flag": nested_region_flag, "data": nb_regions},
}
def generate(self, hg_format=None):
def generate(self, hg_format):
"""Generate hostgroup based on a provided format"""
# Set format to default in case its not specified
if not hg_format:
hg_format = (
"site/manufacturer/role" if self.type == "dev" else "cluster/role"
)
# Split all given names
hg_output = []
hg_items = hg_format.split("/")
for hg_item in hg_items:
# Check if requested data is available as option for this host
if hg_item not in self.format_options:
# Check if a custom field exists with this name
cf_data = self.custom_field_lookup(hg_item)
# CF does not exist
if not cf_data["result"]:
msg = (
f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported."
)
self.logger.error(msg)
raise HostgroupError(msg)
# CF data is populated
if cf_data["cf"]:
hg_output.append(cf_data["cf"])
if hg_item.startswith(("'", '"')) and hg_item.endswith(("'", '"')):
hg_item = hg_item.strip("\'")
hg_item = hg_item.strip('\"')
hg_output.append(hg_item)
else:
# Check if a custom field exists with this name
cf_data = self.custom_field_lookup(hg_item)
# CF does not exist
if not cf_data["result"]:
msg = (
f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported."
)
self.logger.error(msg)
raise HostgroupError(msg)
# CF data is populated
if cf_data["cf"]:
hg_output.append(cf_data["cf"])
continue
# Check if there is a value associated to the variable.
# For instance, if a device has no location, do not use it with hostgroup calculation
hostgroup_value = self.format_options[hg_item]
if hostgroup_value:
hg_output.append(hostgroup_value)
else:
self.logger.info(
"Host %s: Used field '%s' has no value.", self.name, hg_item
)
# Check if the hostgroup is populated with at least one item.
if bool(hg_output):
return "/".join(hg_output)
msg = (
f"Unable to generate hostgroup for host {self.name}."
" Not enough valid items. This is most likely"
" due to the use of custom fields that are empty"
" or an invalid hostgroup format."
f"Host {self.name}: Generating hostgroup name for '{hg_format}' failed. "
f"This is most likely due to fields that have no value."
)
self.logger.error(msg)
raise HostgroupError(msg)
self.logger.warning(msg)
return None
def list_formatoptions(self):
"""

View File

@@ -3,6 +3,7 @@
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from modules.tools import field_mapper, remove_duplicates
@@ -15,7 +16,7 @@ class ZabbixTags:
self,
nb,
tag_map,
tag_sync,
tag_sync=False,
tag_lower=True,
tag_name=None,
tag_value=None,
@@ -76,7 +77,7 @@ class ZabbixTags:
else:
tag["tag"] = tag_name
else:
self.logger.warning(f"Tag {tag_name} is not a valid tag name, skipping.")
self.logger.warning("Tag '%s' is not a valid tag name, skipping.", tag_name)
return False
if self.validate_value(tag_value):
@@ -85,8 +86,8 @@ class ZabbixTags:
else:
tag["value"] = tag_value
else:
self.logger.warning(
f"Tag {tag_name} has an invalid value: '{tag_value}', skipping."
self.logger.info(
"Tag '%s' has an invalid value: '%s', skipping.", tag_name, tag_value
)
return False
return tag
@@ -99,7 +100,7 @@ class ZabbixTags:
tags = []
# Parse the field mapper for tags
if self.tag_map:
self.logger.debug(f"Host {self.nb.name}: Starting tag mapper")
self.logger.debug("Host %s: Starting tag mapper.", self.nb.name)
field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger)
for tag, value in field_tags.items():
t = self.render_tag(tag, value)
@@ -130,4 +131,6 @@ class ZabbixTags:
if t:
tags.append(t)
return remove_duplicates(tags, sortkey="tag")
tags = remove_duplicates(tags, sortkey="tag")
self.logger.debug("Host %s: Resolved tags: %s", self.name, tags)
return tags

View File

@@ -1,6 +1,8 @@
"""A collection of tools used by several classes"""
from modules.exceptions import HostgroupError
def convert_recordset(recordset):
"""Converts netbox RedcordSet to list of dicts."""
recordlist = []
@@ -71,20 +73,23 @@ def field_mapper(host, mapper, nbdevice, logger):
data[zbx_field] = str(value)
elif not value:
# empty value should just be an empty string for API compatibility
logger.debug(
f"Host {host}: NetBox lookup for "
f"'{nb_field}' returned an empty value"
logger.info(
"Host %s: NetBox lookup for '%s' returned an empty value.",
host,
nb_field,
)
data[zbx_field] = ""
else:
# Value is not a string or numeral, probably not what the user expected.
logger.error(
f"Host {host}: Lookup for '{nb_field}'"
" returned an unexpected type: it will be skipped."
logger.info(
"Host %s: Lookup for '%s' returned an unexpected type: it will be skipped.",
host,
nb_field,
)
logger.debug(
f"Host {host}: Field mapping complete. "
f"Mapped {len(list(filter(None, data.values())))} field(s)"
"Host %s: Field mapping complete. Mapped %s field(s).",
host,
len(list(filter(None, data.values()))),
)
return data
@@ -101,7 +106,9 @@ def remove_duplicates(input_list, sortkey=None):
return output_list
def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None):
def verify_hg_format(
hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None
):
"""
Verifies hostgroup field format
"""
@@ -109,49 +116,57 @@ def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", log
device_cfs = []
if not vm_cfs:
vm_cfs = []
allowed_objects = {"dev": ["location",
"rack",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"platform",
"cluster"]
,"vm": ["location",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"cluster",
"device",
"platform"]
,"cfs": {"dev": [], "vm": []}
}
allowed_objects = {
"dev": [
"location",
"rack",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"platform",
"cluster",
],
"vm": [
"cluster_type",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"cluster",
"device",
"platform",
],
"cfs": {"dev": [], "vm": []},
}
for cf in device_cfs:
allowed_objects['cfs']['dev'].append(cf.name)
allowed_objects["cfs"]["dev"].append(cf.name)
for cf in vm_cfs:
allowed_objects['cfs']['vm'].append(cf.name)
allowed_objects["cfs"]["vm"].append(cf.name)
hg_objects = []
if isinstance(hg_format,list):
if isinstance(hg_format, list):
for f in hg_format:
hg_objects = hg_objects + f.split("/")
else:
hg_objects = hg_format.split("/")
hg_objects = sorted(set(hg_objects))
for hg_object in hg_objects:
if (hg_object not in allowed_objects[hg_type] and
hg_object not in allowed_objects['cfs'][hg_type]):
if (
hg_object not in allowed_objects[hg_type]
and hg_object not in allowed_objects["cfs"][hg_type]
and not hg_object.startswith(('"',"'"))
):
e = (
f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and separate them with '/'."
)
logger.error(e)
logger.warning(e)
raise HostgroupError(e)
@@ -159,7 +174,7 @@ def sanatize_log_output(data):
"""
Used for the update function to Zabbix which
shows the data that its using to update the host.
Removes and sensitive data from the input.
Removes any sensitive data from the input.
"""
if not isinstance(data, dict):
return data
@@ -168,7 +183,7 @@ def sanatize_log_output(data):
if "macros" in data:
for macro in sanitized_data["macros"]:
# Check if macro is secret type
if not macro["type"] == str(1):
if not (macro["type"] == str(1) or macro["type"] == 1):
continue
macro["value"] = "********"
# Check for interface data

View File

@@ -3,10 +3,11 @@
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from re import match
from modules.tools import field_mapper
from modules.tools import field_mapper, sanatize_log_output
class ZabbixUsermacros:
@@ -57,8 +58,11 @@ class ZabbixUsermacros:
macro["macro"] = str(macro_name)
if isinstance(macro_properties, dict):
if not "value" in macro_properties:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} has "
"no value in Netbox, skipping.")
self.logger.info(
"Host %s: Usermacro %s has no value in Netbox, skipping.",
self.name,
macro_name,
)
return False
macro["value"] = macro_properties["value"]
@@ -83,12 +87,17 @@ class ZabbixUsermacros:
macro["description"] = ""
else:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} "
"has no value, skipping.")
self.logger.info(
"Host %s: Usermacro %s has no value, skipping.",
self.name,
macro_name,
)
return False
else:
self.logger.error(
f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping."
self.logger.warning(
"Host %s: Usermacro %s is not a valid usermacro name, skipping.",
self.name,
macro_name,
)
return False
return macro
@@ -98,9 +107,10 @@ class ZabbixUsermacros:
Generate full set of Usermacros
"""
macros = []
data = {}
# Parse the field mapper for usermacros
if self.usermacro_map:
self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper")
self.logger.debug("Host %s: Starting usermacro mapper.", self.nb.name)
field_macros = field_mapper(
self.nb.name, self.usermacro_map, self.nb, self.logger
)
@@ -119,4 +129,8 @@ class ZabbixUsermacros:
m = self.render_macro(macro, properties)
if m:
macros.append(m)
data = {"macros": macros}
self.logger.debug(
"Host %s: Resolved macros: %s", self.name, sanatize_log_output(data)
)
return macros

View File

@@ -2,7 +2,6 @@
"""Module that hosts all functions for virtual machine processing"""
from modules.device import PhysicalDevice
from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError
from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface
from modules.config import load_config
# Load config
@@ -16,6 +15,7 @@ class VirtualMachine(PhysicalDevice):
super().__init__(*args, **kwargs)
self.hostgroup = None
self.zbx_template_names = None
self.hostgroup_type = "vm"
def _inventory_map(self):
"""use VM inventory maps"""
@@ -29,25 +29,6 @@ class VirtualMachine(PhysicalDevice):
"""use VM tag maps"""
return config["vm_tag_map"]
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device"""
# Create new Hostgroup instance
hg = Hostgroup(
"vm",
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format
if isinstance(hg_format, list):
self.hostgroups = [hg.generate(f) for f in hg_format]
else:
self.hostgroups.append(hg.generate(hg_format))
def set_vm_template(self):
"""Set Template for VMs. Overwrites default class
to skip a lookup of custom fields."""

View File

@@ -2,6 +2,7 @@
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation
"""NetBox to Zabbix sync script."""
import argparse
import logging
import ssl
@@ -67,15 +68,15 @@ def main(arguments):
try:
# Get NetBox version
nb_version = netbox.version
logger.debug(f"NetBox version is {nb_version}.")
logger.debug("NetBox version is %s.", nb_version)
except RequestsConnectionError:
logger.error(
f"Unable to connect to NetBox with URL {netbox_host}."
" Please check the URL and status of NetBox."
"Unable to connect to NetBox with URL %s. Please check the URL and status of NetBox.",
netbox_host,
)
sys.exit(1)
except NBRequestError as e:
logger.error(f"NetBox error: {e}")
logger.error("NetBox error: %s", e)
sys.exit(1)
# Check if the provided Hostgroup layout is valid
device_cfs = []
@@ -83,14 +84,18 @@ def main(arguments):
device_cfs = list(
netbox.extras.custom_fields.filter(type="text", content_types="dcim.device")
)
verify_hg_format(config["hostgroup_format"],
device_cfs=device_cfs, hg_type="dev", logger=logger)
verify_hg_format(
config["hostgroup_format"], device_cfs=device_cfs, hg_type="dev", logger=logger
)
if config["sync_vms"]:
vm_cfs = list(
netbox.extras.custom_fields.filter(type="text",
content_types="virtualization.virtualmachine")
netbox.extras.custom_fields.filter(
type="text", content_types="virtualization.virtualmachine"
)
)
verify_hg_format(
config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger
)
verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger)
# Set Zabbix API
try:
ssl_ctx = ssl.create_default_context()
@@ -120,7 +125,8 @@ def main(arguments):
netbox_vms = []
if config["sync_vms"]:
netbox_vms = list(
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]))
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"])
)
netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all()))
netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries
@@ -141,15 +147,22 @@ def main(arguments):
# Go through all NetBox devices
for nb_vm in netbox_vms:
try:
vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version,
config["create_journal"], logger)
logger.debug(f"Host {vm.name}: started operations on VM.")
vm = VirtualMachine(
nb_vm,
zabbix,
netbox_journals,
nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on VM.", vm.name)
vm.set_vm_template()
# Check if a valid template has been found for this VM.
if not vm.zbx_template_names:
continue
vm.set_hostgroup(config["vm_hostgroup_format"],
netbox_site_groups, netbox_regions)
vm.set_hostgroup(
config["vm_hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM.
if not vm.hostgroups:
continue
@@ -162,13 +175,12 @@ def main(arguments):
# Delete device from Zabbix
# and remove hostID from NetBox.
vm.cleanup()
logger.debug(f"VM {vm.name}: cleanup complete")
logger.info("VM %s: cleanup complete", vm.name)
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"VM {vm.name}: skipping since this VM is "
f"not in the active state."
"VM %s: Skipping since this VM is not in the active state.", vm.name
)
continue
# Check if the VM is in the disabled state
@@ -200,18 +212,31 @@ def main(arguments):
for nb_device in netbox_devices:
try:
# Set device instance set data such as hostgroup and template information.
device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version,
config["create_journal"], logger)
logger.debug(f"Host {device.name}: started operations on device.")
device.set_template(config["templates_config_context"],
config["templates_config_context_overrule"])
device = PhysicalDevice(
nb_device,
zabbix,
netbox_journals,
nb_version,
config["create_journal"],
logger,
)
logger.debug("Host %s: Started operations on device.", device.name)
device.set_template(
config["templates_config_context"],
config["templates_config_context_overrule"],
)
# Check if a valid template has been found for this VM.
if not device.zbx_template_names:
continue
device.set_hostgroup(
config["hostgroup_format"], netbox_site_groups, netbox_regions)
config["hostgroup_format"], netbox_site_groups, netbox_regions
)
# Check if a valid hostgroup has been found for this VM.
if not device.hostgroups:
logger.warning(
"Host %s: Host has no valid hostgroups, Skipping this host...",
device.name,
)
continue
device.set_inventory(nb_device)
device.set_usermacros()
@@ -221,16 +246,16 @@ def main(arguments):
if device.isCluster() and config["clustering"]:
# Check if device is primary or secondary
if device.promoteMasterDevice():
e = f"Device {device.name}: is " f"part of cluster and primary."
logger.info(e)
logger.info(
"Device %s: is part of cluster and primary.", device.name
)
else:
# Device is secondary in cluster.
# Don't continue with this device.
e = (
f"Device {device.name}: is part of cluster "
f"but not primary. Skipping this host..."
logger.info(
"Device %s: Is part of cluster but not primary. Skipping this host...",
device.name,
)
logger.info(e)
continue
# Checks if device is in cleanup state
if device.status in config["zabbix_device_removal"]:
@@ -238,13 +263,13 @@ def main(arguments):
# Delete device from Zabbix
# and remove hostID from NetBox.
device.cleanup()
logger.info(f"Device {device.name}: cleanup complete")
logger.info("Device %s: cleanup complete", device.name)
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(
f"Device {device.name}: skipping since this device is "
f"not in the active state."
"Device %s: Skipping since this device is not in the active state.",
device.name,
)
continue
# Check if the device is in the disabled state
@@ -280,7 +305,7 @@ if __name__ == "__main__":
description="A script to sync Zabbix with NetBox device data."
)
parser.add_argument(
"-v", "--verbose", help="Turn on debugging.", action="store_true"
"-v", "--verbose", help="Turn on verbose logging.", action="store_true"
)
parser.add_argument(
"-vv", "--debug", help="Turn on debugging.", action="store_true"

View File

@@ -1,2 +1,2 @@
pynetbox==7.4.1
zabbix-utils==2.0.2
zabbix-utils==2.0.3

372
tests/test_hostgroups.py Normal file
View File

@@ -0,0 +1,372 @@
"""Tests for the Hostgroup class in the hostgroups module."""
import unittest
from unittest.mock import MagicMock, patch, call
from modules.hostgroups import Hostgroup
from modules.exceptions import HostgroupError
class TestHostgroups(unittest.TestCase):
"""Test class for Hostgroup functionality."""
def setUp(self):
"""Set up test fixtures."""
# Create mock logger
self.mock_logger = MagicMock()
# *** Mock NetBox Device setup ***
# Create mock device with all properties
self.mock_device = MagicMock()
self.mock_device.name = "test-device"
# Set up site information
site = MagicMock()
site.name = "TestSite"
# Set up region information
region = MagicMock()
region.name = "TestRegion"
# Ensure region string representation returns the name
region.__str__.return_value = "TestRegion"
site.region = region
# Set up site group information
site_group = MagicMock()
site_group.name = "TestSiteGroup"
# Ensure site group string representation returns the name
site_group.__str__.return_value = "TestSiteGroup"
site.group = site_group
self.mock_device.site = site
# Set up role information (varies based on NetBox version)
self.mock_device_role = MagicMock()
self.mock_device_role.name = "TestRole"
# Ensure string representation returns the name
self.mock_device_role.__str__.return_value = "TestRole"
self.mock_device.device_role = self.mock_device_role
self.mock_device.role = self.mock_device_role
# Set up tenant information
tenant = MagicMock()
tenant.name = "TestTenant"
# Ensure tenant string representation returns the name
tenant.__str__.return_value = "TestTenant"
tenant_group = MagicMock()
tenant_group.name = "TestTenantGroup"
# Ensure tenant group string representation returns the name
tenant_group.__str__.return_value = "TestTenantGroup"
tenant.group = tenant_group
self.mock_device.tenant = tenant
# Set up platform information
platform = MagicMock()
platform.name = "TestPlatform"
self.mock_device.platform = platform
# Device-specific properties
device_type = MagicMock()
manufacturer = MagicMock()
manufacturer.name = "TestManufacturer"
device_type.manufacturer = manufacturer
self.mock_device.device_type = device_type
location = MagicMock()
location.name = "TestLocation"
# Ensure location string representation returns the name
location.__str__.return_value = "TestLocation"
self.mock_device.location = location
# Custom fields
self.mock_device.custom_fields = {"test_cf": "TestCF"}
# *** Mock NetBox VM setup ***
# Create mock VM with all properties
self.mock_vm = MagicMock()
self.mock_vm.name = "test-vm"
# Reuse site from device
self.mock_vm.site = site
# Set up role for VM
self.mock_vm.role = self.mock_device_role
# Set up tenant for VM (same as device)
self.mock_vm.tenant = tenant
# Set up platform for VM (same as device)
self.mock_vm.platform = platform
# VM-specific properties
cluster = MagicMock()
cluster.name = "TestCluster"
cluster_type = MagicMock()
cluster_type.name = "TestClusterType"
cluster.type = cluster_type
self.mock_vm.cluster = cluster
# Custom fields
self.mock_vm.custom_fields = {"test_cf": "TestCF"}
# Mock data for nesting tests
self.mock_regions_data = [
{"name": "ParentRegion", "parent": None, "_depth": 0},
{"name": "TestRegion", "parent": "ParentRegion", "_depth": 1}
]
self.mock_groups_data = [
{"name": "ParentSiteGroup", "parent": None, "_depth": 0},
{"name": "TestSiteGroup", "parent": "ParentSiteGroup", "_depth": 1}
]
def test_device_hostgroup_creation(self):
"""Test basic device hostgroup creation."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Test the string representation
self.assertEqual(str(hostgroup), "Hostgroup for dev test-device")
# Check format options were set correctly
self.assertEqual(hostgroup.format_options["site"], "TestSite")
self.assertEqual(hostgroup.format_options["region"], "TestRegion")
self.assertEqual(hostgroup.format_options["site_group"], "TestSiteGroup")
self.assertEqual(hostgroup.format_options["role"], "TestRole")
self.assertEqual(hostgroup.format_options["tenant"], "TestTenant")
self.assertEqual(hostgroup.format_options["tenant_group"], "TestTenantGroup")
self.assertEqual(hostgroup.format_options["platform"], "TestPlatform")
self.assertEqual(hostgroup.format_options["manufacturer"], "TestManufacturer")
self.assertEqual(hostgroup.format_options["location"], "TestLocation")
def test_vm_hostgroup_creation(self):
"""Test basic VM hostgroup creation."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Test the string representation
self.assertEqual(str(hostgroup), "Hostgroup for vm test-vm")
# Check format options were set correctly
self.assertEqual(hostgroup.format_options["site"], "TestSite")
self.assertEqual(hostgroup.format_options["region"], "TestRegion")
self.assertEqual(hostgroup.format_options["site_group"], "TestSiteGroup")
self.assertEqual(hostgroup.format_options["role"], "TestRole")
self.assertEqual(hostgroup.format_options["tenant"], "TestTenant")
self.assertEqual(hostgroup.format_options["tenant_group"], "TestTenantGroup")
self.assertEqual(hostgroup.format_options["platform"], "TestPlatform")
self.assertEqual(hostgroup.format_options["cluster"], "TestCluster")
self.assertEqual(hostgroup.format_options["cluster_type"], "TestClusterType")
def test_invalid_object_type(self):
"""Test that an invalid object type raises an exception."""
with self.assertRaises(HostgroupError):
Hostgroup("invalid", self.mock_device, "4.0", self.mock_logger)
def test_device_hostgroup_formats(self):
"""Test different hostgroup formats for devices."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Custom format: site/region
custom_result = hostgroup.generate("site/region")
self.assertEqual(custom_result, "TestSite/TestRegion")
# Custom format: site/tenant/platform/location
complex_result = hostgroup.generate("site/tenant/platform/location")
self.assertEqual(complex_result, "TestSite/TestTenant/TestPlatform/TestLocation")
def test_vm_hostgroup_formats(self):
"""Test different hostgroup formats for VMs."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Default format: cluster/role
default_result = hostgroup.generate("cluster/role")
self.assertEqual(default_result, "TestCluster/TestRole")
# Custom format: site/tenant
custom_result = hostgroup.generate("site/tenant")
self.assertEqual(custom_result, "TestSite/TestTenant")
# Custom format: cluster/cluster_type/platform
complex_result = hostgroup.generate("cluster/cluster_type/platform")
self.assertEqual(complex_result, "TestCluster/TestClusterType/TestPlatform")
def test_device_netbox_version_differences(self):
"""Test hostgroup generation with different NetBox versions."""
# NetBox v2.x
hostgroup_v2 = Hostgroup("dev", self.mock_device, "2.11", self.mock_logger)
self.assertEqual(hostgroup_v2.format_options["role"], "TestRole")
# NetBox v3.x
hostgroup_v3 = Hostgroup("dev", self.mock_device, "3.5", self.mock_logger)
self.assertEqual(hostgroup_v3.format_options["role"], "TestRole")
# NetBox v4.x (already tested in other methods)
def test_custom_field_lookup(self):
"""Test custom field lookup functionality."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Test custom field exists and is populated
cf_result = hostgroup.custom_field_lookup("test_cf")
self.assertTrue(cf_result["result"])
self.assertEqual(cf_result["cf"], "TestCF")
# Test custom field doesn't exist
cf_result = hostgroup.custom_field_lookup("nonexistent_cf")
self.assertFalse(cf_result["result"])
self.assertIsNone(cf_result["cf"])
def test_hostgroup_with_custom_field(self):
"""Test hostgroup generation including a custom field."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Generate with custom field included
result = hostgroup.generate("site/test_cf/role")
self.assertEqual(result, "TestSite/TestCF/TestRole")
def test_missing_hostgroup_format_item(self):
"""Test handling of missing hostgroup format items."""
# Create a device with minimal attributes
minimal_device = MagicMock()
minimal_device.name = "minimal-device"
minimal_device.site = None
minimal_device.tenant = None
minimal_device.platform = None
minimal_device.custom_fields = {}
# Create role
role = MagicMock()
role.name = "MinimalRole"
minimal_device.role = role
# Create device_type with manufacturer
device_type = MagicMock()
manufacturer = MagicMock()
manufacturer.name = "MinimalManufacturer"
device_type.manufacturer = manufacturer
minimal_device.device_type = device_type
# Create hostgroup
hostgroup = Hostgroup("dev", minimal_device, "4.0", self.mock_logger)
# Generate with default format
result = hostgroup.generate("site/manufacturer/role")
# Site is missing, so only manufacturer and role should be included
self.assertEqual(result, "MinimalManufacturer/MinimalRole")
# Test with invalid format
with self.assertRaises(HostgroupError):
hostgroup.generate("site/nonexistent/role")
def test_nested_region_hostgroups(self):
"""Test hostgroup generation with nested regions."""
# Mock the build_path function to return a predictable result
with patch('modules.hostgroups.build_path') as mock_build_path:
# Configure the mock to return a list of regions in the path
mock_build_path.return_value = ["ParentRegion", "TestRegion"]
# Create hostgroup with nested regions enabled
hostgroup = Hostgroup(
"dev",
self.mock_device,
"4.0",
self.mock_logger,
nested_region_flag=True,
nb_regions=self.mock_regions_data
)
# Generate hostgroup with region
result = hostgroup.generate("site/region/role")
# Should include the parent region
self.assertEqual(result, "TestSite/ParentRegion/TestRegion/TestRole")
def test_nested_sitegroup_hostgroups(self):
"""Test hostgroup generation with nested site groups."""
# Mock the build_path function to return a predictable result
with patch('modules.hostgroups.build_path') as mock_build_path:
# Configure the mock to return a list of site groups in the path
mock_build_path.return_value = ["ParentSiteGroup", "TestSiteGroup"]
# Create hostgroup with nested site groups enabled
hostgroup = Hostgroup(
"dev",
self.mock_device,
"4.0",
self.mock_logger,
nested_sitegroup_flag=True,
nb_groups=self.mock_groups_data
)
# Generate hostgroup with site_group
result = hostgroup.generate("site/site_group/role")
# Should include the parent site group
self.assertEqual(result, "TestSite/ParentSiteGroup/TestSiteGroup/TestRole")
def test_list_formatoptions(self):
"""Test the list_formatoptions method for debugging."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Patch sys.stdout to capture print output
with patch('sys.stdout') as mock_stdout:
hostgroup.list_formatoptions()
# Check that print was called with expected output
calls = [call.write(f"The following options are available for host test-device"),
call.write('\n')]
mock_stdout.assert_has_calls(calls, any_order=True)
def test_vm_list_based_hostgroup_format(self):
"""Test VM hostgroup generation with a list-based format."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Test with a list of format strings
format_list = ["platform", "role", "cluster_type/cluster"]
# Generate hostgroups for each format in the list
hostgroups = []
for fmt in format_list:
result = hostgroup.generate(fmt)
if result: # Only add non-None results
hostgroups.append(result)
# Verify each expected hostgroup is generated
self.assertEqual(len(hostgroups), 3) # Should have 3 hostgroups
self.assertIn("TestPlatform", hostgroups)
self.assertIn("TestRole", hostgroups)
self.assertIn("TestClusterType/TestCluster", hostgroups)
def test_nested_format_splitting(self):
"""Test that formats with slashes correctly split and resolve each component."""
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Test a format with slashes that should be split
complex_format = "cluster_type/cluster"
result = hostgroup.generate(complex_format)
# Verify the format is correctly split and each component resolved
self.assertEqual(result, "TestClusterType/TestCluster")
def test_multiple_hostgroup_formats_device(self):
"""Test device hostgroup generation with multiple formats."""
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Test with various formats that would be in a list
formats = [
"site",
"manufacturer/role",
"platform/location",
"tenant_group/tenant"
]
# Generate and check each format
results = {}
for fmt in formats:
results[fmt] = hostgroup.generate(fmt)
# Verify results
self.assertEqual(results["site"], "TestSite")
self.assertEqual(results["manufacturer/role"], "TestManufacturer/TestRole")
self.assertEqual(results["platform/location"], "TestPlatform/TestLocation")
self.assertEqual(results["tenant_group/tenant"], "TestTenantGroup/TestTenant")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,137 @@
"""Tests for list-based hostgroup formats in configuration."""
import unittest
from unittest.mock import MagicMock, patch
from modules.hostgroups import Hostgroup
from modules.exceptions import HostgroupError
from modules.tools import verify_hg_format
class TestListHostgroupFormats(unittest.TestCase):
"""Test class for list-based hostgroup format functionality."""
def setUp(self):
"""Set up test fixtures."""
# Create mock logger
self.mock_logger = MagicMock()
# Create mock device
self.mock_device = MagicMock()
self.mock_device.name = "test-device"
# Set up site information
site = MagicMock()
site.name = "TestSite"
# Set up region information
region = MagicMock()
region.name = "TestRegion"
region.__str__.return_value = "TestRegion"
site.region = region
# Set device site
self.mock_device.site = site
# Set up role information
self.mock_device_role = MagicMock()
self.mock_device_role.name = "TestRole"
self.mock_device_role.__str__.return_value = "TestRole"
self.mock_device.role = self.mock_device_role
# Set up rack information
rack = MagicMock()
rack.name = "TestRack"
self.mock_device.rack = rack
# Set up platform information
platform = MagicMock()
platform.name = "TestPlatform"
self.mock_device.platform = platform
# Device-specific properties
device_type = MagicMock()
manufacturer = MagicMock()
manufacturer.name = "TestManufacturer"
device_type.manufacturer = manufacturer
self.mock_device.device_type = device_type
# Create mock VM
self.mock_vm = MagicMock()
self.mock_vm.name = "test-vm"
# Reuse site from device
self.mock_vm.site = site
# Set up role for VM
self.mock_vm.role = self.mock_device_role
# Set up platform for VM
self.mock_vm.platform = platform
# VM-specific properties
cluster = MagicMock()
cluster.name = "TestCluster"
cluster_type = MagicMock()
cluster_type.name = "TestClusterType"
cluster.type = cluster_type
self.mock_vm.cluster = cluster
def test_verify_list_based_hostgroup_format(self):
"""Test verification of list-based hostgroup formats."""
# List format with valid items
valid_format = ["region", "site", "rack"]
# List format with nested path
valid_nested_format = ["region", "site/rack"]
# List format with invalid item
invalid_format = ["region", "invalid_item", "rack"]
# Should not raise exception for valid formats
verify_hg_format(valid_format, hg_type="dev", logger=self.mock_logger)
verify_hg_format(valid_nested_format, hg_type="dev", logger=self.mock_logger)
# Should raise exception for invalid format
with self.assertRaises(HostgroupError):
verify_hg_format(invalid_format, hg_type="dev", logger=self.mock_logger)
def test_simulate_hostgroup_generation_from_config(self):
"""Simulate how the main script would generate hostgroups from list-based config."""
# Mock configuration with list-based hostgroup format
config_format = ["region", "site", "rack"]
hostgroup = Hostgroup("dev", self.mock_device, "4.0", self.mock_logger)
# Simulate the main script's hostgroup generation process
hostgroups = []
for fmt in config_format:
result = hostgroup.generate(fmt)
if result:
hostgroups.append(result)
# Check results
self.assertEqual(len(hostgroups), 3)
self.assertIn("TestRegion", hostgroups)
self.assertIn("TestSite", hostgroups)
self.assertIn("TestRack", hostgroups)
def test_vm_hostgroup_format_from_config(self):
"""Test VM hostgroup generation with list-based format."""
# Mock VM configuration with mixed format
config_format = ["platform", "role", "cluster_type/cluster"]
hostgroup = Hostgroup("vm", self.mock_vm, "4.0", self.mock_logger)
# Simulate the main script's hostgroup generation process
hostgroups = []
for fmt in config_format:
result = hostgroup.generate(fmt)
if result:
hostgroups.append(result)
# Check results
self.assertEqual(len(hostgroups), 3)
self.assertIn("TestPlatform", hostgroups)
self.assertIn("TestRole", hostgroups)
self.assertIn("TestClusterType/TestCluster", hostgroups)
if __name__ == "__main__":
unittest.main()

View File

@@ -88,7 +88,7 @@ class TestZabbixUsermacros(unittest.TestCase):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("{$FOO}", {"type": "text"})
self.assertFalse(result)
self.logger.warning.assert_called()
self.logger.info.assert_called()
def test_render_macro_str(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
@@ -102,7 +102,7 @@ class TestZabbixUsermacros(unittest.TestCase):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("FOO", "bar")
self.assertFalse(result)
self.logger.error.assert_called()
self.logger.warning.assert_called()
def test_generate_from_map(self):
nb = DummyNB(memory="bar", role="baz")