started work on macro support

This commit is contained in:
Raymond Kuiper 2025-02-12 17:43:57 +01:00
parent 4264dc9b31
commit cebefd681e
3 changed files with 81 additions and 4 deletions

View File

@ -8,9 +8,11 @@ from re import search
from logging import getLogger
from zabbix_utils import APIRequestError
from modules.exceptions import (SyncInventoryError, TemplateError, SyncExternalError,
InterfaceConfigError, JournalError)
InterfaceConfigError, JournalError, UsermacroError)
from modules.interface import ZabbixInterface
from modules.usermacros import ZabbixUsermacros
from modules.hostgroups import Hostgroup
from pprint import pprint
try:
from config import (
@ -19,7 +21,8 @@ try:
traverse_regions,
inventory_sync,
inventory_mode,
device_inventory_map
device_inventory_map,
device_usermacro_map
)
except ModuleNotFoundError:
print("Configuration file config.py not found in main directory."
@ -54,6 +57,7 @@ class PhysicalDevice():
self.nb_journals = nb_journal_class
self.inventory_mode = -1
self.inventory = {}
self.usermacros = {}
self.logger = logger if logger else getLogger(__name__)
self._setBasics()
@ -67,6 +71,10 @@ class PhysicalDevice():
""" Use device inventory maps """
return device_inventory_map
def _usermacro_map(self):
""" Use device inventory maps """
return device_usermacro_map
def _setBasics(self):
"""
Sets basic information like IP address.
@ -363,6 +371,19 @@ class PhysicalDevice():
self.logger.warning(message)
raise SyncInventoryError(message) from e
def setUsermacros(self):
try:
# Initiate interface class
macros = ZabbixUsermacros(self.nb.config_context, self._usermacro_map())
if macros.sync == False:
return {}
else:
return [{'macro': '{$USERMACRO}', 'value': '123', 'type': 0, 'description': 'just a test'}]
except UsermacroError as e:
message = f"{self.name}: {e}"
self.logger.warning(message)
raise UsermacroError(message) from e
def setProxy(self, proxy_list):
"""
Sets proxy or proxy group if this
@ -423,6 +444,8 @@ class PhysicalDevice():
groups = [{"groupid": self.group_id}]
# Set Zabbix proxy if defined
self.setProxy(proxies)
# Set usermacros
self.usermacros = self.setUsermacros()
# Set basic data for host creation
create_data = {"host": self.name,
"name": self.visible_name,
@ -432,7 +455,8 @@ class PhysicalDevice():
"templates": templateids,
"description": description,
"inventory_mode": self.inventory_mode,
"inventory": self.inventory
"inventory": self.inventory,
"macros": self.usermacros
}
# If a Zabbix proxy or Zabbix Proxy group has been defined
if self.zbxproxy:
@ -547,7 +571,10 @@ class PhysicalDevice():
selectGroups=["groupid"],
selectHostGroups=["groupid"],
selectParentTemplates=["templateid"],
selectInventory=list(self._inventory_map().values()))
selectInventory=list(self._inventory_map().values()),
selectMacros=["macro","value","type","description"]
)
pprint(host)
if len(host) > 1:
e = (f"Got {len(host)} results for Zabbix hosts "
f"with ID {self.zabbix_id} - hostname {self.name}.")

View File

@ -31,3 +31,6 @@ class HostgroupError(SyncError):
class TemplateError(SyncError):
""" Class TemplateError """
class UsermacroError(SyncError):
""" Class UsermacroError """

47
modules/usermacros.py Normal file
View File

@ -0,0 +1,47 @@
#!/usr/bin/env python3
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from zabbix_utils import APIRequestError
from modules.exceptions import UsermacroError
from pprint import pprint
try:
from config import (
usermacro_sync,
)
except ModuleNotFoundError:
print("Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py.")
sys.exit(0)
class ZabbixUsermacros():
"""Class that represents a Zabbix interface."""
def __init__(self, context, usermacro_map, logger=None):
self.context = context
self.usermacro_map = usermacro_map
self.logger = logger if logger else getLogger(__name__)
self.usermacros = {}
self.sync = False
self.force_sync = False
self._setConfig()
def __repr__(self):
return self.name
def __str__(self):
return self.__repr__()
def _setConfig(self):
if str(usermacro_sync) == "full":
self.sync = True
self.force_sync = True
elif usermacro_sync:
self.sync = True
return True