🚨 Formatted and linted files

This commit is contained in:
Wouter de Bruijn 2025-02-26 14:00:18 +01:00
parent 0c798ec968
commit b314b2c883
No known key found for this signature in database
GPG Key ID: AC71F96733B92BFA
8 changed files with 514 additions and 304 deletions

View File

@ -3,48 +3,61 @@
""" """
Device specific handeling for NetBox to Zabbix Device specific handeling for NetBox to Zabbix
""" """
from os import sys
from re import search
from copy import deepcopy from copy import deepcopy
from logging import getLogger from logging import getLogger
from os import sys
from re import search
from zabbix_utils import APIRequestError from zabbix_utils import APIRequestError
from modules.exceptions import (SyncInventoryError, TemplateError, SyncExternalError,
InterfaceConfigError, JournalError) from modules.exceptions import (
from modules.interface import ZabbixInterface InterfaceConfigError,
from modules.usermacros import ZabbixUsermacros JournalError,
from modules.tags import ZabbixTags SyncExternalError,
SyncInventoryError,
TemplateError,
)
from modules.hostgroups import Hostgroup from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface
from modules.tags import ZabbixTags
from modules.tools import field_mapper, remove_duplicates from modules.tools import field_mapper, remove_duplicates
from modules.usermacros import ZabbixUsermacros
try: try:
from config import ( from config import (
template_cf, device_cf, device_cf,
traverse_site_groups,
traverse_regions,
inventory_sync,
inventory_mode,
device_inventory_map, device_inventory_map,
usermacro_sync, device_tag_map,
device_usermacro_map, device_usermacro_map,
tag_sync, inventory_mode,
inventory_sync,
tag_lower, tag_lower,
tag_name, tag_name,
tag_sync,
tag_value, tag_value,
device_tag_map template_cf,
traverse_regions,
traverse_site_groups,
usermacro_sync,
) )
except ModuleNotFoundError: except ModuleNotFoundError:
print("Configuration file config.py not found in main directory." print(
"Please create the file or rename the config.py.example file to config.py.") "Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py."
)
sys.exit(0) sys.exit(0)
class PhysicalDevice():
class PhysicalDevice:
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments # pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments
""" """
Represents Network device. Represents Network device.
INPUT: (NetBox device class, ZabbixAPI class, journal flag, NB journal class) INPUT: (NetBox device class, ZabbixAPI class, journal flag, NB journal class)
""" """
def __init__(self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None): def __init__(
self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None
):
self.nb = nb self.nb = nb
self.id = nb.id self.id = nb.id
self.name = nb.name self.name = nb.name
@ -77,15 +90,15 @@ class PhysicalDevice():
return self.__repr__() return self.__repr__()
def _inventory_map(self): def _inventory_map(self):
""" Use device inventory maps """ """Use device inventory maps"""
return device_inventory_map return device_inventory_map
def _usermacro_map(self): def _usermacro_map(self):
""" Use device inventory maps """ """Use device inventory maps"""
return device_usermacro_map return device_usermacro_map
def _tag_map(self): def _tag_map(self):
""" Use device host tag maps """ """Use device host tag maps"""
return device_tag_map return device_tag_map
def _setBasics(self): def _setBasics(self):
@ -112,30 +125,38 @@ class PhysicalDevice():
# Validate hostname format. # Validate hostname format.
odd_character_list = ["ä", "ö", "ü", "Ä", "Ö", "Ü", "ß"] odd_character_list = ["ä", "ö", "ü", "Ä", "Ö", "Ü", "ß"]
self.use_visible_name = False self.use_visible_name = False
if (any(letter in self.name for letter in odd_character_list) or if any(letter in self.name for letter in odd_character_list) or bool(
bool(search('[\u0400-\u04FF]', self.name))): search("[\u0400-\u04ff]", self.name)
):
self.name = f"NETBOX_ID{self.id}" self.name = f"NETBOX_ID{self.id}"
self.visible_name = self.nb.name self.visible_name = self.nb.name
self.use_visible_name = True self.use_visible_name = True
self.logger.info(f"Host {self.visible_name} contains special characters. " self.logger.info(
f"Using {self.name} as name for the NetBox object " f"Host {self.visible_name} contains special characters. "
f"and using {self.visible_name} as visible name in Zabbix.") f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix."
)
else: else:
pass pass
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions): def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device""" """Set the hostgroup for this device"""
# Create new Hostgroup instance # Create new Hostgroup instance
hg = Hostgroup("dev", self.nb, self.nb_api_version, logger=self.logger, hg = Hostgroup(
nested_sitegroup_flag=traverse_site_groups, "dev",
nested_region_flag=traverse_regions, self.nb,
nb_groups=nb_site_groups, self.nb_api_version,
nb_regions=nb_regions) logger=self.logger,
nested_sitegroup_flag=traverse_site_groups,
nested_region_flag=traverse_regions,
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format # Generate hostgroup based on hostgroup format
self.hostgroup = hg.generate(hg_format) self.hostgroup = hg.generate(hg_format)
def set_template(self, prefer_config_context, overrule_custom): def set_template(self, prefer_config_context, overrule_custom):
""" Set Template """ """Set Template"""
self.zbx_template_names = None self.zbx_template_names = None
# Gather templates ONLY from the device specific context # Gather templates ONLY from the device specific context
if prefer_config_context: if prefer_config_context:
@ -159,7 +180,7 @@ class PhysicalDevice():
return True return True
def get_templates_cf(self): def get_templates_cf(self):
""" Get template from custom field """ """Get template from custom field"""
# Get Zabbix templates from the device type # Get Zabbix templates from the device type
device_type_cfs = self.nb.device_type.custom_fields device_type_cfs = self.nb.device_type.custom_fields
# Check if the ZBX Template CF is present # Check if the ZBX Template CF is present
@ -167,20 +188,26 @@ class PhysicalDevice():
# Set value to template # Set value to template
return [device_type_cfs[template_cf]] return [device_type_cfs[template_cf]]
# Custom field not found, return error # Custom field not found, return error
e = (f"Custom field {template_cf} not " e = (
f"Custom field {template_cf} not "
f"found for {self.nb.device_type.manufacturer.name}" f"found for {self.nb.device_type.manufacturer.name}"
f" - {self.nb.device_type.display}.") f" - {self.nb.device_type.display}."
)
raise TemplateError(e) raise TemplateError(e)
def get_templates_context(self): def get_templates_context(self):
""" Get Zabbix templates from the device context """ """Get Zabbix templates from the device context"""
if "zabbix" not in self.config_context: if "zabbix" not in self.config_context:
e = (f"Host {self.name}: Key 'zabbix' not found in config " e = (
"context for template lookup") f"Host {self.name}: Key 'zabbix' not found in config "
"context for template lookup"
)
raise TemplateError(e) raise TemplateError(e)
if "templates" not in self.config_context["zabbix"]: if "templates" not in self.config_context["zabbix"]:
e = (f"Host {self.name}: Key 'templates' not found in config " e = (
"context 'zabbix' for template lookup") f"Host {self.name}: Key 'templates' not found in config "
"context 'zabbix' for template lookup"
)
raise TemplateError(e) raise TemplateError(e)
# Check if format is list or string. # Check if format is list or string.
if isinstance(self.config_context["zabbix"]["templates"], str): if isinstance(self.config_context["zabbix"]["templates"], str):
@ -188,25 +215,31 @@ class PhysicalDevice():
return self.config_context["zabbix"]["templates"] return self.config_context["zabbix"]["templates"]
def set_inventory(self, nbdevice): def set_inventory(self, nbdevice):
""" Set host inventory """ """Set host inventory"""
# Set inventory mode. Default is disabled (see class init function). # Set inventory mode. Default is disabled (see class init function).
if inventory_mode == "disabled": if inventory_mode == "disabled":
if inventory_sync: if inventory_sync:
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. " self.logger.error(
"Inventory sync is enabled in config but inventory mode is disabled.") f"Host {self.name}: Unable to map NetBox inventory to Zabbix. "
"Inventory sync is enabled in config but inventory mode is disabled."
)
return True return True
if inventory_mode == "manual": if inventory_mode == "manual":
self.inventory_mode = 0 self.inventory_mode = 0
elif inventory_mode == "automatic": elif inventory_mode == "automatic":
self.inventory_mode = 1 self.inventory_mode = 1
else: else:
self.logger.error(f"Host {self.name}: Specified value for inventory mode in" self.logger.error(
f" config is not valid. Got value {inventory_mode}") f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {inventory_mode}"
)
return False return False
self.inventory = {} self.inventory = {}
if inventory_sync and self.inventory_mode in [0,1]: if inventory_sync and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper") self.logger.debug(f"Host {self.name}: Starting inventory mapper")
self.inventory = field_mapper(self.name, self._inventory_map(), nbdevice, self.logger) self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger
)
return True return True
def isCluster(self): def isCluster(self):
@ -220,13 +253,17 @@ class PhysicalDevice():
Returns chassis master ID. Returns chassis master ID.
""" """
if not self.isCluster(): if not self.isCluster():
e = (f"Unable to proces {self.name} for cluster calculation: " e = (
f"not part of a cluster.") f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster."
)
self.logger.warning(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
if not self.nb.virtual_chassis.master: if not self.nb.virtual_chassis.master:
e = (f"{self.name} is part of a NetBox virtual chassis which does " e = (
"not have a master configured. Skipping for this reason.") f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason."
)
self.logger.error(e) self.logger.error(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
return self.nb.virtual_chassis.master.id return self.nb.virtual_chassis.master.id
@ -239,9 +276,11 @@ class PhysicalDevice():
""" """
masterid = self.getClusterMaster() masterid = self.getClusterMaster()
if masterid == self.id: if masterid == self.id:
self.logger.debug(f"Host {self.name} is primary cluster member. " self.logger.debug(
f"Modifying hostname from {self.name} to " + f"Host {self.name} is primary cluster member. "
f"{self.nb.virtual_chassis.name}.") f"Modifying hostname from {self.name} to "
+ f"{self.nb.virtual_chassis.name}."
)
self.name = self.nb.virtual_chassis.name self.name = self.nb.virtual_chassis.name
return True return True
self.logger.debug(f"Host {self.name} is non-primary cluster member.") self.logger.debug(f"Host {self.name} is non-primary cluster member.")
@ -266,18 +305,24 @@ class PhysicalDevice():
# Go through all templates found in Zabbix # Go through all templates found in Zabbix
for zbx_template in templates: for zbx_template in templates:
# If the template names match # If the template names match
if zbx_template['name'] == nb_template: if zbx_template["name"] == nb_template:
# Set match variable to true, add template details # Set match variable to true, add template details
# to class variable and return debug log # to class variable and return debug log
template_match = True template_match = True
self.zbx_templates.append({"templateid": zbx_template['templateid'], self.zbx_templates.append(
"name": zbx_template['name']}) {
"templateid": zbx_template["templateid"],
"name": zbx_template["name"],
}
)
e = f"Host {self.name}: found template {zbx_template['name']}" e = f"Host {self.name}: found template {zbx_template['name']}"
self.logger.debug(e) self.logger.debug(e)
# Return error should the template not be found in Zabbix # Return error should the template not be found in Zabbix
if not template_match: if not template_match:
e = (f"Unable to find template {nb_template} " e = (
f"for host {self.name} in Zabbix. Skipping host...") f"Unable to find template {nb_template} "
f"for host {self.name} in Zabbix. Skipping host..."
)
self.logger.warning(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
@ -289,8 +334,8 @@ class PhysicalDevice():
""" """
# Go through all groups # Go through all groups
for group in groups: for group in groups:
if group['name'] == self.hostgroup: if group["name"] == self.hostgroup:
self.group_id = group['groupid'] self.group_id = group["groupid"]
e = f"Host {self.name}: matched group {group['name']}" e = f"Host {self.name}: matched group {group['name']}"
self.logger.debug(e) self.logger.debug(e)
return True return True
@ -304,10 +349,13 @@ class PhysicalDevice():
if self.zabbix_id: if self.zabbix_id:
try: try:
# Check if the Zabbix host exists in Zabbix # Check if the Zabbix host exists in Zabbix
zbx_host = bool(self.zabbix.host.get(filter={'hostid': self.zabbix_id}, zbx_host = bool(
output=[])) self.zabbix.host.get(filter={"hostid": self.zabbix_id}, output=[])
e = (f"Host {self.name}: was already deleted from Zabbix." )
" Removed link in NetBox.") e = (
f"Host {self.name}: was already deleted from Zabbix."
" Removed link in NetBox."
)
if zbx_host: if zbx_host:
# Delete host should it exists # Delete host should it exists
self.zabbix.host.delete(self.zabbix_id) self.zabbix.host.delete(self.zabbix_id)
@ -332,9 +380,9 @@ class PhysicalDevice():
""" """
# Validate the hostname or visible name field # Validate the hostname or visible name field
if not self.use_visible_name: if not self.use_visible_name:
zbx_filter = {'host': self.name} zbx_filter = {"host": self.name}
else: else:
zbx_filter = {'name': self.visible_name} zbx_filter = {"name": self.visible_name}
host = self.zabbix.host.get(filter=zbx_filter, output=[]) host = self.zabbix.host.get(filter=zbx_filter, output=[])
return bool(host) return bool(host)
@ -364,24 +412,33 @@ class PhysicalDevice():
""" """
Generates Usermacros Generates Usermacros
""" """
macros = ZabbixUsermacros(self.nb, self._usermacro_map(), macros = ZabbixUsermacros(
usermacro_sync, logger=self.logger, self.nb,
host=self.name) self._usermacro_map(),
usermacro_sync,
logger=self.logger,
host=self.name,
)
if macros.sync is False: if macros.sync is False:
self.usermacros = [] self.usermacros = []
self.usermacros = macros.generate() self.usermacros = macros.generate()
return True return True
def set_tags(self): def set_tags(self):
""" """
Generates Host Tags Generates Host Tags
""" """
tags = ZabbixTags(self.nb, self._tag_map(), tags = ZabbixTags(
tag_sync, tag_lower, tag_name=tag_name, self.nb,
tag_value=tag_value, logger=self.logger, self._tag_map(),
host=self.name) tag_sync,
tag_lower,
tag_name=tag_name,
tag_value=tag_value,
logger=self.logger,
host=self.name,
)
if tags.sync is False: if tags.sync is False:
self.tags = [] self.tags = []
@ -398,14 +455,16 @@ class PhysicalDevice():
# check if the key Zabbix is defined in the config context # check if the key Zabbix is defined in the config context
if not "zabbix" in self.nb.config_context: if not "zabbix" in self.nb.config_context:
return False return False
if ("proxy" in self.nb.config_context["zabbix"] and if (
not self.nb.config_context["zabbix"]["proxy"]): "proxy" in self.nb.config_context["zabbix"]
and not self.nb.config_context["zabbix"]["proxy"]
):
return False return False
# Proxy group takes priority over a proxy due # Proxy group takes priority over a proxy due
# to it being HA and therefore being more reliable # to it being HA and therefore being more reliable
# Includes proxy group fix since Zabbix <= 6 should ignore this # Includes proxy group fix since Zabbix <= 6 should ignore this
proxy_types = ["proxy"] proxy_types = ["proxy"]
if str(self.zabbix.version).startswith('7'): if str(self.zabbix.version).startswith("7"):
# Only insert groups in front of list for Zabbix7 # Only insert groups in front of list for Zabbix7
proxy_types.insert(0, "proxy_group") proxy_types.insert(0, "proxy_group")
for proxy_type in proxy_types: for proxy_type in proxy_types:
@ -419,15 +478,23 @@ class PhysicalDevice():
continue continue
# If the proxy name matches # If the proxy name matches
if proxy["name"] == proxy_name: if proxy["name"] == proxy_name:
self.logger.debug(f"Host {self.name}: using {proxy['type']}" self.logger.debug(
f" {proxy_name}") f"Host {self.name}: using {proxy['type']}" f" {proxy_name}"
)
self.zbxproxy = proxy self.zbxproxy = proxy
return True return True
self.logger.warning(f"Host {self.name}: unable to find proxy {proxy_name}") self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}"
)
return False return False
def createInZabbix(self, groups, templates, proxies, def createInZabbix(
description="Host added by NetBox sync script."): self,
groups,
templates,
proxies,
description="Host added by NetBox sync script.",
):
""" """
Creates Zabbix host object with parameters from NetBox object. Creates Zabbix host object with parameters from NetBox object.
""" """
@ -435,37 +502,40 @@ class PhysicalDevice():
if not self._zabbixHostnameExists(): if not self._zabbixHostnameExists():
# Set group and template ID's for host # Set group and template ID's for host
if not self.setZabbixGroupID(groups): if not self.setZabbixGroupID(groups):
e = (f"Unable to find group '{self.hostgroup}' " e = (
f"for host {self.name} in Zabbix.") f"Unable to find group '{self.hostgroup}' "
f"for host {self.name} in Zabbix."
)
self.logger.warning(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
self.zbxTemplatePrepper(templates) self.zbxTemplatePrepper(templates)
templateids = [] templateids = []
for template in self.zbx_templates: for template in self.zbx_templates:
templateids.append({'templateid': template['templateid']}) templateids.append({"templateid": template["templateid"]})
# Set interface, group and template configuration # Set interface, group and template configuration
interfaces = self.setInterfaceDetails() interfaces = self.setInterfaceDetails()
groups = [{"groupid": self.group_id}] groups = [{"groupid": self.group_id}]
# Set Zabbix proxy if defined # Set Zabbix proxy if defined
self.setProxy(proxies) self.setProxy(proxies)
# Set basic data for host creation # Set basic data for host creation
create_data = {"host": self.name, create_data = {
"name": self.visible_name, "host": self.name,
"status": self.zabbix_state, "name": self.visible_name,
"interfaces": interfaces, "status": self.zabbix_state,
"groups": groups, "interfaces": interfaces,
"templates": templateids, "groups": groups,
"description": description, "templates": templateids,
"inventory_mode": self.inventory_mode, "description": description,
"inventory": self.inventory, "inventory_mode": self.inventory_mode,
"macros": self.usermacros, "inventory": self.inventory,
"tags": self.tags "macros": self.usermacros,
} "tags": self.tags,
}
# If a Zabbix proxy or Zabbix Proxy group has been defined # If a Zabbix proxy or Zabbix Proxy group has been defined
if self.zbxproxy: if self.zbxproxy:
# If a lower version than 7 is used, we can assume that # If a lower version than 7 is used, we can assume that
# the proxy is a normal proxy and not a proxy group # the proxy is a normal proxy and not a proxy group
if not str(self.zabbix.version).startswith('7'): if not str(self.zabbix.version).startswith("7"):
create_data["proxy_hostid"] = self.zbxproxy["id"] create_data["proxy_hostid"] = self.zbxproxy["id"]
else: else:
# Configure either a proxy or proxy group # Configure either a proxy or proxy group
@ -496,8 +566,8 @@ class PhysicalDevice():
""" """
final_data = [] final_data = []
# Check if the hostgroup is in a nested format and check each parent # Check if the hostgroup is in a nested format and check each parent
for pos in range(len(self.hostgroup.split('/'))): for pos in range(len(self.hostgroup.split("/"))):
zabbix_hg = self.hostgroup.rsplit('/', pos)[0] zabbix_hg = self.hostgroup.rsplit("/", pos)[0]
if self.lookupZabbixHostgroup(hostgroups, zabbix_hg): if self.lookupZabbixHostgroup(hostgroups, zabbix_hg):
# Hostgroup already exists # Hostgroup already exists
continue continue
@ -508,7 +578,9 @@ class PhysicalDevice():
e = f"Hostgroup '{zabbix_hg}': created in Zabbix." e = f"Hostgroup '{zabbix_hg}': created in Zabbix."
self.logger.info(e) self.logger.info(e)
# Add group to final data # Add group to final data
final_data.append({'groupid': groupid["groupids"][0], 'name': zabbix_hg}) final_data.append(
{"groupid": groupid["groupids"][0], "name": zabbix_hg}
)
except APIRequestError as e: except APIRequestError as e:
msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}." msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}."
self.logger.error(msg) self.logger.error(msg)
@ -535,20 +607,24 @@ class PhysicalDevice():
try: try:
self.zabbix.host.update(hostid=self.zabbix_id, **kwargs) self.zabbix.host.update(hostid=self.zabbix_id, **kwargs)
except APIRequestError as e: except APIRequestError as e:
e = (f"Host {self.name}: Unable to update. " e = (
f"Zabbix returned the following error: {str(e)}.") f"Host {self.name}: Unable to update. "
f"Zabbix returned the following error: {str(e)}."
)
self.logger.error(e) self.logger.error(e)
raise SyncExternalError(e) from None raise SyncExternalError(e) from None
self.logger.info(f"Updated host {self.name} with data {kwargs}.") self.logger.info(f"Updated host {self.name} with data {kwargs}.")
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck(self, groups, templates, proxies, proxy_power, create_hostgroups): def ConsistencyCheck(
self, groups, templates, proxies, proxy_power, create_hostgroups
):
# pylint: disable=too-many-branches, too-many-statements # pylint: disable=too-many-branches, too-many-statements
""" """
Checks if Zabbix object is still valid with NetBox parameters. Checks if Zabbix object is still valid with NetBox parameters.
""" """
# If group is found or if the hostgroup is nested # If group is found or if the hostgroup is nested
if not self.setZabbixGroupID(groups) or len(self.hostgroup.split('/')) > 1: if not self.setZabbixGroupID(groups) or len(self.hostgroup.split("/")) > 1:
if create_hostgroups: if create_hostgroups:
# Script is allowed to create a new hostgroup # Script is allowed to create a new hostgroup
new_groups = self.createZabbixHostgroup(groups) new_groups = self.createZabbixHostgroup(groups)
@ -559,50 +635,59 @@ class PhysicalDevice():
if not self.group_id: if not self.group_id:
# Function returns true / false but also sets GroupID # Function returns true / false but also sets GroupID
if not self.setZabbixGroupID(groups) and not create_hostgroups: if not self.setZabbixGroupID(groups) and not create_hostgroups:
e = (f"Host {self.name}: different hostgroup is required but " e = (
"unable to create hostgroup without generation permission.") f"Host {self.name}: different hostgroup is required but "
"unable to create hostgroup without generation permission."
)
self.logger.warning(e) self.logger.warning(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
# Prepare templates and proxy config # Prepare templates and proxy config
self.zbxTemplatePrepper(templates) self.zbxTemplatePrepper(templates)
self.setProxy(proxies) self.setProxy(proxies)
# Get host object from Zabbix # Get host object from Zabbix
host = self.zabbix.host.get(filter={'hostid': self.zabbix_id}, host = self.zabbix.host.get(
selectInterfaces=['type', 'ip', filter={"hostid": self.zabbix_id},
'port', 'details', selectInterfaces=["type", "ip", "port", "details", "interfaceid"],
'interfaceid'], selectGroups=["groupid"],
selectGroups=["groupid"], selectHostGroups=["groupid"],
selectHostGroups=["groupid"], selectParentTemplates=["templateid"],
selectParentTemplates=["templateid"], selectInventory=list(self._inventory_map().values()),
selectInventory=list(self._inventory_map().values()), selectMacros=["macro", "value", "type", "description"],
selectMacros=["macro","value","type","description"], selectTags=["tag", "value"],
selectTags=["tag","value"] )
)
if len(host) > 1: if len(host) > 1:
e = (f"Got {len(host)} results for Zabbix hosts " e = (
f"with ID {self.zabbix_id} - hostname {self.name}.") f"Got {len(host)} results for Zabbix hosts "
f"with ID {self.zabbix_id} - hostname {self.name}."
)
self.logger.error(e) self.logger.error(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
if len(host) == 0: if len(host) == 0:
e = (f"Host {self.name}: No Zabbix host found. " e = (
f"This is likely the result of a deleted Zabbix host " f"Host {self.name}: No Zabbix host found. "
f"without zeroing the ID field in NetBox.") f"This is likely the result of a deleted Zabbix host "
f"without zeroing the ID field in NetBox."
)
self.logger.error(e) self.logger.error(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
host = host[0] host = host[0]
if host["host"] == self.name: if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: hostname in-sync.") self.logger.debug(f"Host {self.name}: hostname in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: hostname OUT of sync. " self.logger.warning(
f"Received value: {host['host']}") f"Host {self.name}: hostname OUT of sync. "
f"Received value: {host['host']}"
)
self.updateZabbixHost(host=self.name) self.updateZabbixHost(host=self.name)
# Execute check depending on wether the name is special or not # Execute check depending on wether the name is special or not
if self.use_visible_name: if self.use_visible_name:
if host["name"] == self.visible_name: if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: visible name in-sync.") self.logger.debug(f"Host {self.name}: visible name in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: visible name OUT of sync." self.logger.warning(
f" Received value: {host['name']}") f"Host {self.name}: visible name OUT of sync."
f" Received value: {host['name']}"
)
self.updateZabbixHost(name=self.visible_name) self.updateZabbixHost(name=self.visible_name)
# Check if the templates are in-sync # Check if the templates are in-sync
@ -611,23 +696,24 @@ class PhysicalDevice():
# Prepare Templates for API parsing # Prepare Templates for API parsing
templateids = [] templateids = []
for template in self.zbx_templates: for template in self.zbx_templates:
templateids.append({'templateid': template['templateid']}) templateids.append({"templateid": template["templateid"]})
# Update Zabbix with NB templates and clear any old / lost templates # Update Zabbix with NB templates and clear any old / lost templates
self.updateZabbixHost(templates_clear=host["parentTemplates"], self.updateZabbixHost(
templates=templateids) templates_clear=host["parentTemplates"], templates=templateids
)
else: else:
self.logger.debug(f"Host {self.name}: template(s) in-sync.") self.logger.debug(f"Host {self.name}: template(s) in-sync.")
# Check if Zabbix version is 6 or higher. Issue #93 # Check if Zabbix version is 6 or higher. Issue #93
group_dictname = "hostgroups" group_dictname = "hostgroups"
if str(self.zabbix.version).startswith(('6', '5')): if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups" group_dictname = "groups"
for group in host[group_dictname]: for group in host[group_dictname]:
if group["groupid"] == self.group_id: if group["groupid"] == self.group_id:
self.logger.debug(f"Host {self.name}: hostgroup in-sync.") self.logger.debug(f"Host {self.name}: hostgroup in-sync.")
break break
self.logger.warning(f"Host {self.name}: hostgroup OUT of sync.") self.logger.warning(f"Host {self.name}: hostgroup OUT of sync.")
self.updateZabbixHost(groups={'groupid': self.group_id}) self.updateZabbixHost(groups={"groupid": self.group_id})
if int(host["status"]) == self.zabbix_state: if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: status in-sync.") self.logger.debug(f"Host {self.name}: status in-sync.")
@ -637,8 +723,10 @@ class PhysicalDevice():
# Check if a proxy has been defined # Check if a proxy has been defined
if self.zbxproxy: if self.zbxproxy:
# Check if proxy or proxy group is defined # Check if proxy or proxy group is defined
if (self.zbxproxy["idtype"] in host and if (
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]): self.zbxproxy["idtype"] in host
and host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]
):
self.logger.debug(f"Host {self.name}: proxy in-sync.") self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Backwards compatibility for Zabbix <= 6 # Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]: elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
@ -647,13 +735,15 @@ class PhysicalDevice():
else: else:
self.logger.warning(f"Host {self.name}: proxy OUT of sync.") self.logger.warning(f"Host {self.name}: proxy OUT of sync.")
# Zabbix <= 6 patch # Zabbix <= 6 patch
if not str(self.zabbix.version).startswith('7'): if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy['id']) self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
# Zabbix 7+ # Zabbix 7+
else: else:
# Prepare data structure for updating either proxy or group # Prepare data structure for updating either proxy or group
update_data = {self.zbxproxy["idtype"]: self.zbxproxy["id"], update_data = {
"monitored_by": self.zbxproxy['monitored_by']} self.zbxproxy["idtype"]: self.zbxproxy["id"],
"monitored_by": self.zbxproxy["monitored_by"],
}
self.updateZabbixHost(**update_data) self.updateZabbixHost(**update_data)
else: else:
# No proxy is defined in NetBox # No proxy is defined in NetBox
@ -665,8 +755,10 @@ class PhysicalDevice():
proxy_set = True proxy_set = True
if proxy_power and proxy_set: if proxy_power and proxy_set:
# Zabbix <= 6 fix # Zabbix <= 6 fix
self.logger.warning(f"Host {self.name}: no proxy is configured in NetBox " self.logger.warning(
"but is configured in Zabbix. Removing proxy config in Zabbix") f"Host {self.name}: no proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix"
)
if "proxy_hostid" in host and bool(host["proxy_hostid"]): if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0) self.updateZabbixHost(proxy_hostid=0)
# Zabbix 7 proxy # Zabbix 7 proxy
@ -678,21 +770,23 @@ class PhysicalDevice():
# Checks if a proxy has been defined in Zabbix and if proxy_power config has been set # Checks if a proxy has been defined in Zabbix and if proxy_power config has been set
if proxy_set and not proxy_power: if proxy_set and not proxy_power:
# Display error message # Display error message
self.logger.error(f"Host {self.name} is configured " self.logger.error(
f"with proxy in Zabbix but not in NetBox. The" f"Host {self.name} is configured "
" -p flag was ommited: no " f"with proxy in Zabbix but not in NetBox. The"
"changes have been made.") " -p flag was ommited: no "
"changes have been made."
)
if not proxy_set: if not proxy_set:
self.logger.debug(f"Host {self.name}: proxy in-sync.") self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Check host inventory mode # Check host inventory mode
if str(host['inventory_mode']) == str(self.inventory_mode): if str(host["inventory_mode"]) == str(self.inventory_mode):
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.") self.logger.debug(f"Host {self.name}: inventory_mode in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.") self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.")
self.updateZabbixHost(inventory_mode=str(self.inventory_mode)) self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
if inventory_sync and self.inventory_mode in [0,1]: if inventory_sync and self.inventory_mode in [0, 1]:
# Check host inventory mapping # Check host inventory mapping
if host['inventory'] == self.inventory: if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: inventory in-sync.") self.logger.debug(f"Host {self.name}: inventory in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: inventory OUT of sync.") self.logger.warning(f"Host {self.name}: inventory OUT of sync.")
@ -704,12 +798,12 @@ class PhysicalDevice():
# Do not re-sync secret usermacros unless sync is set to 'full' # Do not re-sync secret usermacros unless sync is set to 'full'
if str(usermacro_sync).lower() != "full": if str(usermacro_sync).lower() != "full":
for m in deepcopy(self.usermacros): for m in deepcopy(self.usermacros):
if m['type'] == str(1): if m["type"] == str(1):
# Remove the value as the api doesn't return it # Remove the value as the api doesn't return it
# this will allow us to only update usermacros that don't exist # this will allow us to only update usermacros that don't exist
m.pop('value') m.pop("value")
macros_filtered.append(m) macros_filtered.append(m)
if host['macros'] == self.usermacros or host['macros'] == macros_filtered: if host["macros"] == self.usermacros or host["macros"] == macros_filtered:
self.logger.debug(f"Host {self.name}: usermacros in-sync.") self.logger.debug(f"Host {self.name}: usermacros in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: usermacros OUT of sync.") self.logger.warning(f"Host {self.name}: usermacros OUT of sync.")
@ -717,7 +811,7 @@ class PhysicalDevice():
# Check host usermacros # Check host usermacros
if tag_sync: if tag_sync:
if remove_duplicates(host['tags'],sortkey='tag') == self.tags: if remove_duplicates(host["tags"], sortkey="tag") == self.tags:
self.logger.debug(f"Host {self.name}: tags in-sync.") self.logger.debug(f"Host {self.name}: tags in-sync.")
else: else:
self.logger.warning(f"Host {self.name}: tags OUT of sync.") self.logger.warning(f"Host {self.name}: tags OUT of sync.")
@ -725,7 +819,7 @@ class PhysicalDevice():
# If only 1 interface has been found # If only 1 interface has been found
# pylint: disable=too-many-nested-blocks # pylint: disable=too-many-nested-blocks
if len(host['interfaces']) == 1: if len(host["interfaces"]) == 1:
updates = {} updates = {}
# Go through each key / item and check if it matches Zabbix # Go through each key / item and check if it matches Zabbix
for key, item in self.setInterfaceDetails()[0].items(): for key, item in self.setInterfaceDetails()[0].items():
@ -733,7 +827,7 @@ class PhysicalDevice():
if key in host["interfaces"][0]: if key in host["interfaces"][0]:
# If SNMP is used, go through nested dict # If SNMP is used, go through nested dict
# to compare SNMP parameters # to compare SNMP parameters
if isinstance(item,dict) and key == "details": if isinstance(item, dict) and key == "details":
for k, i in item.items(): for k, i in item.items():
if k in host["interfaces"][0][key]: if k in host["interfaces"][0][key]:
# Set update if values don't match # Set update if values don't match
@ -761,12 +855,14 @@ class PhysicalDevice():
self.logger.warning(f"Host {self.name}: Interface OUT of sync.") self.logger.warning(f"Host {self.name}: Interface OUT of sync.")
if "type" in updates: if "type" in updates:
# Changing interface type not supported. Raise exception. # Changing interface type not supported. Raise exception.
e = (f"Host {self.name}: changing interface type to " e = (
f"{str(updates['type'])} is not supported.") f"Host {self.name}: changing interface type to "
f"{str(updates['type'])} is not supported."
)
self.logger.error(e) self.logger.error(e)
raise InterfaceConfigError(e) raise InterfaceConfigError(e)
# Set interfaceID for Zabbix config # Set interfaceID for Zabbix config
updates["interfaceid"] = host["interfaces"][0]['interfaceid'] updates["interfaceid"] = host["interfaces"][0]["interfaceid"]
try: try:
# API call to Zabbix # API call to Zabbix
self.zabbix.hostinterface.update(updates) self.zabbix.hostinterface.update(updates)
@ -782,9 +878,11 @@ class PhysicalDevice():
e = f"Host {self.name}: interface in-sync." e = f"Host {self.name}: interface in-sync."
self.logger.debug(e) self.logger.debug(e)
else: else:
e = (f"Host {self.name} has unsupported interface configuration." e = (
f" Host has total of {len(host['interfaces'])} interfaces. " f"Host {self.name} has unsupported interface configuration."
"Manual interfention required.") f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual interfention required."
)
self.logger.error(e) self.logger.error(e)
raise SyncInventoryError(e) raise SyncInventoryError(e)
@ -796,20 +894,25 @@ class PhysicalDevice():
if self.journal: if self.journal:
# Check if the severity is valid # Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]: if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning(f"Value {severity} not valid for NB journal entries.") self.logger.warning(
f"Value {severity} not valid for NB journal entries."
)
return False return False
journal = {"assigned_object_type": "dcim.device", journal = {
"assigned_object_id": self.id, "assigned_object_type": "dcim.device",
"kind": severity, "assigned_object_id": self.id,
"comments": message "kind": severity,
} "comments": message,
}
try: try:
self.nb_journals.create(journal) self.nb_journals.create(journal)
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox") self.logger.debug(f"Host {self.name}: Created journal entry in NetBox")
return True return True
except JournalError(e) as e: except JournalError(e) as e:
self.logger.warning("Unable to create journal entry for " self.logger.warning(
f"{self.name}: NB returned {e}") "Unable to create journal entry for "
f"{self.name}: NB returned {e}"
)
return False return False
return False return False
@ -832,10 +935,15 @@ class PhysicalDevice():
# and add this NB template to the list of successfull templates # and add this NB template to the list of successfull templates
tmpls_from_zabbix.pop(pos) tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl) succesfull_templates.append(nb_tmpl)
self.logger.debug(f"Host {self.name}: template " self.logger.debug(
f"{nb_tmpl['name']} is present in Zabbix.") f"Host {self.name}: template "
f"{nb_tmpl['name']} is present in Zabbix."
)
break break
if len(succesfull_templates) == len(self.zbx_templates) and len(tmpls_from_zabbix) == 0: if (
len(succesfull_templates) == len(self.zbx_templates)
and len(tmpls_from_zabbix) == 0
):
# All of the NetBox templates have been confirmed as successfull # All of the NetBox templates have been confirmed as successfull
# and the ZBX template list is empty. This means that # and the ZBX template list is empty. This means that
# all of the templates match. # all of the templates match.

View File

@ -2,35 +2,47 @@
""" """
All custom exceptions used for Exception generation All custom exceptions used for Exception generation
""" """
class SyncError(Exception): class SyncError(Exception):
""" Class SyncError """ """Class SyncError"""
class JournalError(Exception): class JournalError(Exception):
""" Class SyncError """ """Class SyncError"""
class SyncExternalError(SyncError): class SyncExternalError(SyncError):
""" Class SyncExternalError """ """Class SyncExternalError"""
class SyncInventoryError(SyncError): class SyncInventoryError(SyncError):
""" Class SyncInventoryError """ """Class SyncInventoryError"""
class SyncDuplicateError(SyncError): class SyncDuplicateError(SyncError):
""" Class SyncDuplicateError """ """Class SyncDuplicateError"""
class EnvironmentVarError(SyncError): class EnvironmentVarError(SyncError):
""" Class EnvironmentVarError """ """Class EnvironmentVarError"""
class InterfaceConfigError(SyncError): class InterfaceConfigError(SyncError):
""" Class InterfaceConfigError """ """Class InterfaceConfigError"""
class ProxyConfigError(SyncError): class ProxyConfigError(SyncError):
""" Class ProxyConfigError """ """Class ProxyConfigError"""
class HostgroupError(SyncError): class HostgroupError(SyncError):
""" Class HostgroupError """ """Class HostgroupError"""
class TemplateError(SyncError): class TemplateError(SyncError):
""" Class TemplateError """ """Class TemplateError"""
class UsermacroError(SyncError): class UsermacroError(SyncError):
""" Class UsermacroError """ """Class UsermacroError"""

View File

@ -1,14 +1,26 @@
"""Module for all hostgroup related code""" """Module for all hostgroup related code"""
from logging import getLogger from logging import getLogger
from modules.exceptions import HostgroupError from modules.exceptions import HostgroupError
from modules.tools import build_path from modules.tools import build_path
class Hostgroup():
class Hostgroup:
"""Hostgroup class for devices and VM's """Hostgroup class for devices and VM's
Takes type (vm or dev) and NB object""" Takes type (vm or dev) and NB object"""
def __init__(self, obj_type, nb_obj, version, logger=None, #pylint: disable=too-many-arguments, too-many-positional-arguments
nested_sitegroup_flag=False, nested_region_flag=False, def __init__(
nb_regions=None, nb_groups=None): self,
obj_type,
nb_obj,
version,
logger=None, # pylint: disable=too-many-arguments, too-many-positional-arguments
nested_sitegroup_flag=False,
nested_region_flag=False,
nb_regions=None,
nb_groups=None,
):
self.logger = logger if logger else getLogger(__name__) self.logger = logger if logger else getLogger(__name__)
if obj_type not in ("vm", "dev"): if obj_type not in ("vm", "dev"):
msg = f"Unable to create hostgroup with type {type}" msg = f"Unable to create hostgroup with type {type}"
@ -19,8 +31,9 @@ class Hostgroup():
self.name = self.nb.name self.name = self.nb.name
self.nb_version = version self.nb_version = version
# Used for nested data objects # Used for nested data objects
self.set_nesting(nested_sitegroup_flag, nested_region_flag, self.set_nesting(
nb_groups, nb_regions) nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
)
self._set_format_options() self._set_format_options()
def __str__(self): def __str__(self):
@ -49,20 +62,28 @@ class Hostgroup():
format_options["site_group"] = None format_options["site_group"] = None
if self.nb.site: if self.nb.site:
if self.nb.site.region: if self.nb.site.region:
format_options["region"] = self.generate_parents("region", format_options["region"] = self.generate_parents(
str(self.nb.site.region)) "region", str(self.nb.site.region)
)
if self.nb.site.group: if self.nb.site.group:
format_options["site_group"] = self.generate_parents("site_group", format_options["site_group"] = self.generate_parents(
str(self.nb.site.group)) "site_group", str(self.nb.site.group)
)
format_options["role"] = role format_options["role"] = role
format_options["site"] = self.nb.site.name if self.nb.site else None format_options["site"] = self.nb.site.name if self.nb.site else None
format_options["tenant"] = str(self.nb.tenant) if self.nb.tenant else None format_options["tenant"] = str(self.nb.tenant) if self.nb.tenant else None
format_options["tenant_group"] = str(self.nb.tenant.group) if self.nb.tenant else None format_options["tenant_group"] = (
format_options["platform"] = self.nb.platform.name if self.nb.platform else None str(self.nb.tenant.group) if self.nb.tenant else None
)
format_options["platform"] = (
self.nb.platform.name if self.nb.platform else None
)
# Variables only applicable for devices # Variables only applicable for devices
if self.type == "dev": if self.type == "dev":
format_options["manufacturer"] = self.nb.device_type.manufacturer.name format_options["manufacturer"] = self.nb.device_type.manufacturer.name
format_options["location"] = str(self.nb.location) if self.nb.location else None format_options["location"] = (
str(self.nb.location) if self.nb.location else None
)
# Variables only applicable for VM's # Variables only applicable for VM's
if self.type == "vm": if self.type == "vm":
# Check if a cluster is configured. Could also be configured in a site. # Check if a cluster is configured. Could also be configured in a site.
@ -72,17 +93,22 @@ class Hostgroup():
self.format_options = format_options self.format_options = format_options
def set_nesting(self, nested_sitegroup_flag, nested_region_flag, def set_nesting(
nb_groups, nb_regions): self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
):
"""Set nesting options for this Hostgroup""" """Set nesting options for this Hostgroup"""
self.nested_objects = {"site_group": {"flag": nested_sitegroup_flag, "data": nb_groups}, self.nested_objects = {
"region": {"flag": nested_region_flag, "data": nb_regions}} "site_group": {"flag": nested_sitegroup_flag, "data": nb_groups},
"region": {"flag": nested_region_flag, "data": nb_regions},
}
def generate(self, hg_format=None): def generate(self, hg_format=None):
"""Generate hostgroup based on a provided format""" """Generate hostgroup based on a provided format"""
# Set format to default in case its not specified # Set format to default in case its not specified
if not hg_format: if not hg_format:
hg_format = "site/manufacturer/role" if self.type == "dev" else "cluster/role" hg_format = (
"site/manufacturer/role" if self.type == "dev" else "cluster/role"
)
# Split all given names # Split all given names
hg_output = [] hg_output = []
hg_items = hg_format.split("/") hg_items = hg_format.split("/")
@ -93,8 +119,10 @@ class Hostgroup():
cf_data = self.custom_field_lookup(hg_item) cf_data = self.custom_field_lookup(hg_item)
# CF does not exist # CF does not exist
if not cf_data["result"]: if not cf_data["result"]:
msg = (f"Unable to generate hostgroup for host {self.name}. " msg = (
f"Item type {hg_item} not supported.") f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported."
)
self.logger.error(msg) self.logger.error(msg)
raise HostgroupError(msg) raise HostgroupError(msg)
# CF data is populated # CF data is populated
@ -109,10 +137,12 @@ class Hostgroup():
# Check if the hostgroup is populated with at least one item. # Check if the hostgroup is populated with at least one item.
if bool(hg_output): if bool(hg_output):
return "/".join(hg_output) return "/".join(hg_output)
msg = (f"Unable to generate hostgroup for host {self.name}." msg = (
" Not enough valid items. This is most likely" f"Unable to generate hostgroup for host {self.name}."
" due to the use of custom fields that are empty" " Not enough valid items. This is most likely"
" or an invalid hostgroup format.") " due to the use of custom fields that are empty"
" or an invalid hostgroup format."
)
self.logger.error(msg) self.logger.error(msg)
raise HostgroupError(msg) raise HostgroupError(msg)
@ -157,7 +187,9 @@ class Hostgroup():
return child_object return child_object
# If the nested flag is True, perform parent calculation # If the nested flag is True, perform parent calculation
if self.nested_objects[nest_type]["flag"]: if self.nested_objects[nest_type]["flag"]:
final_nested_object = build_path(child_object, self.nested_objects[nest_type]["data"]) final_nested_object = build_path(
child_object, self.nested_objects[nest_type]["data"]
)
return "/".join(final_nested_object) return "/".join(final_nested_object)
# Nesting is not allowed for this object. Return child_object # Nesting is not allowed for this object. Return child_object
return child_object return child_object

View File

@ -4,7 +4,8 @@ All of the Zabbix interface related configuration
""" """
from modules.exceptions import InterfaceConfigError from modules.exceptions import InterfaceConfigError
class ZabbixInterface():
class ZabbixInterface:
"""Class that represents a Zabbix interface.""" """Class that represents a Zabbix interface."""
def __init__(self, context, ip): def __init__(self, context, ip):
@ -15,21 +16,16 @@ class ZabbixInterface():
def _set_default_port(self): def _set_default_port(self):
"""Sets default TCP / UDP port for different interface types""" """Sets default TCP / UDP port for different interface types"""
interface_mapping = { interface_mapping = {1: 10050, 2: 161, 3: 623, 4: 12345}
1: 10050,
2: 161,
3: 623,
4: 12345
}
# Check if interface type is listed in mapper. # Check if interface type is listed in mapper.
if self.interface['type'] not in interface_mapping: if self.interface["type"] not in interface_mapping:
return False return False
# Set default port to interface # Set default port to interface
self.interface["port"] = str(interface_mapping[self.interface['type']]) self.interface["port"] = str(interface_mapping[self.interface["type"]])
return True return True
def get_context(self): def get_context(self):
""" check if NetBox custom context has been defined. """ """check if NetBox custom context has been defined."""
if "zabbix" in self.context: if "zabbix" in self.context:
zabbix = self.context["zabbix"] zabbix = self.context["zabbix"]
if "interface_type" in zabbix: if "interface_type" in zabbix:
@ -43,7 +39,7 @@ class ZabbixInterface():
return False return False
def set_snmp(self): def set_snmp(self):
""" Check if interface is type SNMP """ """Check if interface is type SNMP"""
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
if self.interface["type"] == 2: if self.interface["type"] == 2:
# Checks if SNMP settings are defined in NetBox # Checks if SNMP settings are defined in NetBox
@ -63,7 +59,7 @@ class ZabbixInterface():
e = "SNMP version option is not defined." e = "SNMP version option is not defined."
raise InterfaceConfigError(e) raise InterfaceConfigError(e)
# If version 1 or 2 is used, get community string # If version 1 or 2 is used, get community string
if self.interface["details"]["version"] in ['1','2']: if self.interface["details"]["version"] in ["1", "2"]:
if "community" in snmp: if "community" in snmp:
# Set SNMP community to confix context value # Set SNMP community to confix context value
community = snmp["community"] community = snmp["community"]
@ -73,10 +69,16 @@ class ZabbixInterface():
self.interface["details"]["community"] = str(community) self.interface["details"]["community"] = str(community)
# If version 3 has been used, get all # If version 3 has been used, get all
# SNMPv3 NetBox related configs # SNMPv3 NetBox related configs
elif self.interface["details"]["version"] == '3': elif self.interface["details"]["version"] == "3":
items = ["securityname", "securitylevel", "authpassphrase", items = [
"privpassphrase", "authprotocol", "privprotocol", "securityname",
"contextname"] "securitylevel",
"authpassphrase",
"privpassphrase",
"authprotocol",
"privprotocol",
"contextname",
]
for key, item in snmp.items(): for key, item in snmp.items():
if key in items: if key in items:
self.interface["details"][key] = str(item) self.interface["details"][key] = str(item)
@ -91,13 +93,15 @@ class ZabbixInterface():
raise InterfaceConfigError(e) raise InterfaceConfigError(e)
def set_default_snmp(self): def set_default_snmp(self):
""" Set default config to SNMPv2, port 161 and community macro. """ """Set default config to SNMPv2, port 161 and community macro."""
self.interface = self.skelet self.interface = self.skelet
self.interface["type"] = "2" self.interface["type"] = "2"
self.interface["port"] = "161" self.interface["port"] = "161"
self.interface["details"] = {"version": "2", self.interface["details"] = {
"community": "{$SNMP_COMMUNITY}", "version": "2",
"bulk": "1"} "community": "{$SNMP_COMMUNITY}",
"bulk": "1",
}
def set_default_agent(self): def set_default_agent(self):
"""Sets interface to Zabbix agent defaults""" """Sets interface to Zabbix agent defaults"""

View File

@ -4,13 +4,24 @@
All of the Zabbix Usermacro related configuration All of the Zabbix Usermacro related configuration
""" """
from logging import getLogger from logging import getLogger
from modules.tools import field_mapper, remove_duplicates from modules.tools import field_mapper, remove_duplicates
class ZabbixTags():
class ZabbixTags:
"""Class that represents a Zabbix interface.""" """Class that represents a Zabbix interface."""
def __init__(self, nb, tag_map, tag_sync, tag_lower=True, def __init__(
tag_name=None, tag_value=None, logger=None, host=None): self,
nb,
tag_map,
tag_sync,
tag_lower=True,
tag_name=None,
tag_value=None,
logger=None,
host=None,
):
self.nb = nb self.nb = nb
self.name = host if host else nb.name self.name = host if host else nb.name
self.tag_map = tag_map self.tag_map = tag_map
@ -42,7 +53,7 @@ class ZabbixTags():
""" """
Validates tag name Validates tag name
""" """
if tag_name and isinstance(tag_name, str) and len(tag_name)<=256: if tag_name and isinstance(tag_name, str) and len(tag_name) <= 256:
return True return True
return False return False
@ -50,7 +61,7 @@ class ZabbixTags():
""" """
Validates tag value Validates tag value
""" """
if tag_value and isinstance(tag_value, str) and len(tag_value)<=256: if tag_value and isinstance(tag_value, str) and len(tag_value) <= 256:
return True return True
return False return False
@ -58,23 +69,25 @@ class ZabbixTags():
""" """
Renders a tag Renders a tag
""" """
tag={} tag = {}
if self.validate_tag(tag_name): if self.validate_tag(tag_name):
if self.lower: if self.lower:
tag['tag'] = tag_name.lower() tag["tag"] = tag_name.lower()
else: else:
tag['tag'] = tag_name tag["tag"] = tag_name
else: else:
self.logger.error(f'Tag {tag_name} is not a valid tag name, skipping.') self.logger.error(f"Tag {tag_name} is not a valid tag name, skipping.")
return False return False
if self.validate_value(tag_value): if self.validate_value(tag_value):
if self.lower: if self.lower:
tag['value'] = tag_value.lower() tag["value"] = tag_value.lower()
else: else:
tag['value'] = tag_value tag["value"] = tag_value
else: else:
self.logger.error(f'Tag {tag_name} has an invalid value: \'{tag_value}\', skipping.') self.logger.error(
f"Tag {tag_name} has an invalid value: '{tag_value}', skipping."
)
return False return False
return tag return tag
@ -83,7 +96,7 @@ class ZabbixTags():
Generate full set of Usermacros Generate full set of Usermacros
""" """
# pylint: disable=too-many-branches # pylint: disable=too-many-branches
tags=[] tags = []
# Parse the field mapper for tags # Parse the field mapper for tags
if self.tag_map: if self.tag_map:
self.logger.debug(f"Host {self.nb.name}: Starting tag mapper") self.logger.debug(f"Host {self.nb.name}: Starting tag mapper")
@ -94,9 +107,12 @@ class ZabbixTags():
tags.append(t) tags.append(t)
# Parse NetBox config context for tags # Parse NetBox config context for tags
if ("zabbix" in self.nb.config_context and "tags" in self.nb.config_context['zabbix'] if (
and isinstance(self.nb.config_context['zabbix']['tags'], list)): "zabbix" in self.nb.config_context
for tag in self.nb.config_context['zabbix']['tags']: and "tags" in self.nb.config_context["zabbix"]
and isinstance(self.nb.config_context["zabbix"]["tags"], list)
):
for tag in self.nb.config_context["zabbix"]["tags"]:
if isinstance(tag, dict): if isinstance(tag, dict):
for tagname, value in tag.items(): for tagname, value in tag.items():
t = self.render_tag(tagname, value) t = self.render_tag(tagname, value)
@ -106,12 +122,12 @@ class ZabbixTags():
# Pull in NetBox device tags if tag_name is set # Pull in NetBox device tags if tag_name is set
if self.tag_name and isinstance(self.tag_name, str): if self.tag_name and isinstance(self.tag_name, str):
for tag in self.nb.tags: for tag in self.nb.tags:
if self.tag_value.lower() in ['display', 'name', 'slug']: if self.tag_value.lower() in ["display", "name", "slug"]:
value = tag[self.tag_value] value = tag[self.tag_value]
else: else:
value = tag['name'] value = tag["name"]
t = self.render_tag(self.tag_name, value) t = self.render_tag(self.tag_name, value)
if t: if t:
tags.append(t) tags.append(t)
return remove_duplicates(tags, sortkey='tag') return remove_duplicates(tags, sortkey="tag")

View File

@ -1,12 +1,14 @@
"""A collection of tools used by several classes""" """A collection of tools used by several classes"""
def convert_recordset(recordset): def convert_recordset(recordset):
""" Converts netbox RedcordSet to list of dicts. """ """Converts netbox RedcordSet to list of dicts."""
recordlist = [] recordlist = []
for record in recordset: for record in recordset:
recordlist.append(record.__dict__) recordlist.append(record.__dict__)
return recordlist return recordlist
def build_path(endpoint, list_of_dicts): def build_path(endpoint, list_of_dicts):
""" """
Builds a path list of related parent/child items. Builds a path list of related parent/child items.
@ -14,16 +16,17 @@ def build_path(endpoint, list_of_dicts):
be used in hostgroups. be used in hostgroups.
""" """
item_path = [] item_path = []
itemlist = [i for i in list_of_dicts if i['name'] == endpoint] itemlist = [i for i in list_of_dicts if i["name"] == endpoint]
item = itemlist[0] if len(itemlist) == 1 else None item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item['name']) item_path.append(item["name"])
while item['_depth'] > 0: while item["_depth"] > 0:
itemlist = [i for i in list_of_dicts if i['name'] == str(item['parent'])] itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])]
item = itemlist[0] if len(itemlist) == 1 else None item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item['name']) item_path.append(item["name"])
item_path.reverse() item_path.reverse()
return item_path return item_path
def proxy_prepper(proxy_list, proxy_group_list): def proxy_prepper(proxy_list, proxy_group_list):
""" """
Function that takes 2 lists and converts them using a Function that takes 2 lists and converts them using a
@ -44,15 +47,16 @@ def proxy_prepper(proxy_list, proxy_group_list):
output.append(group) output.append(group)
return output return output
def field_mapper(host, mapper, nbdevice, logger): def field_mapper(host, mapper, nbdevice, logger):
""" """
Maps NetBox field data to Zabbix properties. Maps NetBox field data to Zabbix properties.
Used for Inventory, Usermacros and Tag mappings. Used for Inventory, Usermacros and Tag mappings.
""" """
data={} data = {}
# Let's build an dict for each property in the map # Let's build an dict for each property in the map
for nb_field, zbx_field in mapper.items(): for nb_field, zbx_field in mapper.items():
field_list = nb_field.split("/") # convert str to list based on delimiter field_list = nb_field.split("/") # convert str to list based on delimiter
# start at the base of the dict... # start at the base of the dict...
value = nbdevice value = nbdevice
# ... and step through the dict till we find the needed value # ... and step through the dict till we find the needed value
@ -61,22 +65,30 @@ def field_mapper(host, mapper, nbdevice, logger):
# Check if the result is usable and expected # Check if the result is usable and expected
# We want to apply any int or float 0 values, # We want to apply any int or float 0 values,
# even if python thinks those are empty. # even if python thinks those are empty.
if ((value and isinstance(value, int | float | str )) or if (value and isinstance(value, int | float | str)) or (
(isinstance(value, int | float) and int(value) ==0)): isinstance(value, int | float) and int(value) == 0
):
data[zbx_field] = str(value) data[zbx_field] = str(value)
elif not value: elif not value:
# empty value should just be an empty string for API compatibility # empty value should just be an empty string for API compatibility
logger.debug(f"Host {host}: NetBox lookup for " logger.debug(
f"'{nb_field}' returned an empty value") f"Host {host}: NetBox lookup for "
f"'{nb_field}' returned an empty value"
)
data[zbx_field] = "" data[zbx_field] = ""
else: else:
# Value is not a string or numeral, probably not what the user expected. # Value is not a string or numeral, probably not what the user expected.
logger.error(f"Host {host}: Lookup for '{nb_field}'" logger.error(
" returned an unexpected type: it will be skipped.") f"Host {host}: Lookup for '{nb_field}'"
logger.debug(f"Host {host}: Field mapping complete. " " returned an unexpected type: it will be skipped."
f"Mapped {len(list(filter(None, data.values())))} field(s)") )
logger.debug(
f"Host {host}: Field mapping complete. "
f"Mapped {len(list(filter(None, data.values())))} field(s)"
)
return data return data
def remove_duplicates(input_list, sortkey=None): def remove_duplicates(input_list, sortkey=None):
""" """
Removes duplicate entries from a list and sorts the list Removes duplicate entries from a list and sorts the list

View File

@ -3,11 +3,13 @@
""" """
All of the Zabbix Usermacro related configuration All of the Zabbix Usermacro related configuration
""" """
from re import match
from logging import getLogger from logging import getLogger
from re import match
from modules.tools import field_mapper from modules.tools import field_mapper
class ZabbixUsermacros():
class ZabbixUsermacros:
"""Class that represents a Zabbix interface.""" """Class that represents a Zabbix interface."""
def __init__(self, nb, usermacro_map, usermacro_sync, logger=None, host=None): def __init__(self, nb, usermacro_map, usermacro_sync, logger=None, host=None):
@ -42,40 +44,46 @@ class ZabbixUsermacros():
""" """
Validates usermacro name Validates usermacro name
""" """
pattern = r'\{\$[A-Z0-9\._]*(\:.*)?\}' pattern = r"\{\$[A-Z0-9\._]*(\:.*)?\}"
return match(pattern, macro_name) return match(pattern, macro_name)
def render_macro(self, macro_name, macro_properties): def render_macro(self, macro_name, macro_properties):
""" """
Renders a full usermacro from partial input Renders a full usermacro from partial input
""" """
macro={} macro = {}
macrotypes={'text': 0, 'secret': 1, 'vault': 2} macrotypes = {"text": 0, "secret": 1, "vault": 2}
if self.validate_macro(macro_name): if self.validate_macro(macro_name):
macro['macro'] = str(macro_name) macro["macro"] = str(macro_name)
if isinstance(macro_properties, dict): if isinstance(macro_properties, dict):
if not 'value' in macro_properties: if not "value" in macro_properties:
self.logger.error(f'Usermacro {macro_name} has no value, skipping.') self.logger.error(f"Usermacro {macro_name} has no value, skipping.")
return False return False
macro['value'] = macro_properties['value'] macro["value"] = macro_properties["value"]
if 'type' in macro_properties and macro_properties['type'].lower() in macrotypes: if (
macro['type'] = str(macrotypes[macro_properties['type']]) "type" in macro_properties
and macro_properties["type"].lower() in macrotypes
):
macro["type"] = str(macrotypes[macro_properties["type"]])
else: else:
macro['type'] = str(0) macro["type"] = str(0)
if ('description' in macro_properties and if "description" in macro_properties and isinstance(
isinstance(macro_properties['description'], str)): macro_properties["description"], str
macro['description'] = macro_properties['description'] ):
macro["description"] = macro_properties["description"]
else: else:
macro['description'] = "" macro["description"] = ""
elif isinstance(macro_properties, str): elif isinstance(macro_properties, str):
macro['value'] = macro_properties macro["value"] = macro_properties
macro['type'] = str(0) macro["type"] = str(0)
macro['description'] = "" macro["description"] = ""
else: else:
self.logger.error(f'Usermacro {macro_name} is not a valid usermacro name, skipping.') self.logger.error(
f"Usermacro {macro_name} is not a valid usermacro name, skipping."
)
return False return False
return macro return macro
@ -83,18 +91,25 @@ class ZabbixUsermacros():
""" """
Generate full set of Usermacros Generate full set of Usermacros
""" """
macros=[] macros = []
# Parse the field mapper for usermacros # Parse the field mapper for usermacros
if self.usermacro_map: if self.usermacro_map:
self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper") self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper")
field_macros = field_mapper(self.nb.name, self.usermacro_map, self.nb, self.logger) field_macros = field_mapper(
self.nb.name, self.usermacro_map, self.nb, self.logger
)
for macro, value in field_macros.items(): for macro, value in field_macros.items():
m = self.render_macro(macro, value) m = self.render_macro(macro, value)
if m: if m:
macros.append(m) macros.append(m)
# Parse NetBox config context for usermacros # Parse NetBox config context for usermacros
if "zabbix" in self.nb.config_context and "usermacros" in self.nb.config_context['zabbix']: if (
for macro, properties in self.nb.config_context['zabbix']['usermacros'].items(): "zabbix" in self.nb.config_context
and "usermacros" in self.nb.config_context["zabbix"]
):
for macro, properties in self.nb.config_context["zabbix"][
"usermacros"
].items():
m = self.render_macro(macro, properties) m = self.render_macro(macro, properties)
if m: if m:
macros.append(m) macros.append(m)

View File

@ -3,55 +3,66 @@
"""Module that hosts all functions for virtual machine processing""" """Module that hosts all functions for virtual machine processing"""
from os import sys from os import sys
from modules.device import PhysicalDevice from modules.device import PhysicalDevice
from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError
from modules.hostgroups import Hostgroup from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface from modules.interface import ZabbixInterface
from modules.exceptions import TemplateError, InterfaceConfigError, SyncInventoryError
try: try:
from config import ( from config import (
vm_inventory_map, traverse_regions,
vm_usermacro_map,
vm_tag_map,
traverse_site_groups, traverse_site_groups,
traverse_regions vm_inventory_map,
vm_tag_map,
vm_usermacro_map,
) )
except ModuleNotFoundError: except ModuleNotFoundError:
print("Configuration file config.py not found in main directory." print(
"Please create the file or rename the config.py.example file to config.py.") "Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py."
)
sys.exit(0) sys.exit(0)
class VirtualMachine(PhysicalDevice): class VirtualMachine(PhysicalDevice):
"""Model for virtual machines""" """Model for virtual machines"""
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.hostgroup = None self.hostgroup = None
self.zbx_template_names = None self.zbx_template_names = None
def _inventory_map(self): def _inventory_map(self):
""" use VM inventory maps """ """use VM inventory maps"""
return vm_inventory_map return vm_inventory_map
def _usermacro_map(self): def _usermacro_map(self):
""" use VM usermacro maps """ """use VM usermacro maps"""
return vm_usermacro_map return vm_usermacro_map
def _tag_map(self): def _tag_map(self):
""" use VM tag maps """ """use VM tag maps"""
return vm_tag_map return vm_tag_map
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions): def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device""" """Set the hostgroup for this device"""
# Create new Hostgroup instance # Create new Hostgroup instance
hg = Hostgroup("vm", self.nb, self.nb_api_version, logger=self.logger, hg = Hostgroup(
nested_sitegroup_flag=traverse_site_groups, "vm",
nested_region_flag=traverse_regions, self.nb,
nb_groups=nb_site_groups, self.nb_api_version,
nb_regions=nb_regions) logger=self.logger,
nested_sitegroup_flag=traverse_site_groups,
nested_region_flag=traverse_regions,
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format # Generate hostgroup based on hostgroup format
self.hostgroup = hg.generate(hg_format) self.hostgroup = hg.generate(hg_format)
def set_vm_template(self): def set_vm_template(self):
""" Set Template for VMs. Overwrites default class """Set Template for VMs. Overwrites default class
to skip a lookup of custom fields.""" to skip a lookup of custom fields."""
# Gather templates ONLY from the device specific context # Gather templates ONLY from the device specific context
try: try:
@ -60,7 +71,7 @@ class VirtualMachine(PhysicalDevice):
self.logger.warning(e) self.logger.warning(e)
return True return True
def setInterfaceDetails(self): # pylint: disable=invalid-name def setInterfaceDetails(self): # pylint: disable=invalid-name
""" """
Overwrites device function to select an agent interface type by default Overwrites device function to select an agent interface type by default
Agent type interfaces are more likely to be used with VMs then SNMP Agent type interfaces are more likely to be used with VMs then SNMP