Merge pull request #123 from TheNetworkGuy/develop

Adds unit tests, modular config with default config fallback, ARM docker image support, mapping of usermacros, mapping of tags, inventory sync for VMs, partial support for multiple hostgroups and fixed several bugs.
This commit is contained in:
Twan Kamans 2025-06-16 11:22:06 +02:00 committed by GitHub
commit 0a20e270ed
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2840 additions and 536 deletions

View File

@ -0,0 +1,22 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/python
{
"name": "Python 3",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:1-3.12-bullseye",
// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},
// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],
// Use 'postCreateCommand' to run commands after the container is created.
"postCreateCommand": "pip3 install --user -r requirements.txt && pip3 install --user pylint pytest"
// Configure tool-specific properties.
// "customizations": {},
// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}

View File

@ -1,46 +1,49 @@
name: Publish Docker image to GHCR on a new version
---
name: Build and Push Docker Image
on:
push:
branches:
- main
- dockertest
# tags:
# - [0-9]+.*
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test_quality:
uses: ./.github/workflows/quality.yml
build_and_publish:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4
- name: Log in to the container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GHCR_PAT }}
- name: Extract metadata (tags, labels)
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=semver,pattern={{ version }}
type=ref,event=branch
type=raw,value=latest,enable=${{ github.ref == format('refs/heads/{0}', 'master') }}
type=sha
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@6524bf65af31da8d45b59e8c27de4bd072b392f5
- name: Login to GitHub Container Registry
uses: docker/login-action@9780b0c442fbb1117ed29e0efdff1e18412f7567
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@369eb591f429131d6889c46b94e711f089e6ca96
with:
images: ghcr.io/${{ github.repository }}
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
- name: Build and push Docker image
uses: docker/build-push-action@ca877d9245402d1537745e0e356eab47c3520991
with:
context: .
file: ./Dockerfile
push: true
platforms: linux/amd64,linux/arm64
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
annotations: |
index:org.opencontainers.image.description=Python script to synchronise NetBox devices to Zabbix.

View File

@ -1,15 +1,16 @@
---
name: Pylint Quality control
on:
workflow_call
on:
push:
pull_request:
jobs:
build:
python_quality_testing:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11","3.12"]
python-version: ["3.12","3.13"]
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
@ -23,4 +24,4 @@ jobs:
pip install -r requirements.txt
- name: Analysing the code with pylint
run: |
pylint --module-naming-style=any $(git ls-files '*.py')
pylint --module-naming-style=any modules/* netbox_zabbix_sync.py

32
.github/workflows/run_tests.yml vendored Normal file
View File

@ -0,0 +1,32 @@
---
name: Pytest code testing
on:
push:
pull_request:
jobs:
test_code:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: 3.12
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install pytest pytest-mock coverage pytest-cov
pip install -r requirements.txt
- name: Testing the code with PyTest
run: |
cp config.py.example config.py
pytest tests
- name: Run tests with coverage
run: |
cp config.py.example config.py
coverage run -m pytest tests
- name: Check coverage percentage
run: |
coverage report --fail-under=60

5
.gitignore vendored
View File

@ -1,6 +1,11 @@
*.log
.venv
config.py
Pipfile
Pipfile.lock
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
.vscode
.flake
.coverage

View File

@ -1,9 +1,20 @@
# syntax=docker/dockerfile:1
FROM python:3.12-alpine
RUN mkdir -p /opt/netbox-zabbix && chown -R 1000:1000 /opt/netbox-zabbix
RUN mkdir -p /opt/netbox-zabbix
COPY . /opt/netbox-zabbix
RUN addgroup -g 1000 -S netbox-zabbix && adduser -u 1000 -S netbox-zabbix -G netbox-zabbix
RUN chown -R 1000:1000 /opt/netbox-zabbix
WORKDIR /opt/netbox-zabbix
COPY --chown=1000:1000 . /opt/netbox-zabbix
USER 1000:1000
RUN if ! [ -f ./config.py ]; then cp ./config.py.example ./config.py; fi
USER root
RUN pip install -r ./requirements.txt
USER 1000:1000
ENTRYPOINT ["python"]
CMD ["/opt/netbox-zabbix/netbox_zabbix_sync.py", "-v"]

298
README.md
View File

@ -1,14 +1,13 @@
# NetBox to Zabbix synchronization
A script to create, update and delete Zabbix hosts using NetBox device objects.
Currently compatible with Zabbix 7.0. Zabbix 7.2 is unfortunately not supported and will result in the script failing.
A script to create, update and delete Zabbix hosts using NetBox device objects. Tested and compatible with all [currently supported Zabbix releases](https://www.zabbix.com/life_cycle_and_release_policy).
## Installation via Docker
To pull the latest stable version to your local cache, use the following docker
pull command:
```sh
```bash
docker pull ghcr.io/thenetworkguy/netbox-zabbix-sync:main
```
@ -16,7 +15,7 @@ Make sure to specify the needed environment variables for the script to work
(see [here](#set-environment-variables)) on the command line or use an
[env file](https://docs.docker.com/reference/cli/docker/container/run/#env).
```sh
```bash
docker run -d -t -i -e ZABBIX_HOST='https://zabbix.local' \
-e ZABBIX_TOKEN='othersecrettoken' \
-e NETBOX_HOST='https://netbox.local' \
@ -24,14 +23,14 @@ docker run -d -t -i -e ZABBIX_HOST='https://zabbix.local' \
--name netbox-zabbix-sync ghcr.io/thenetworkguy/netbox-zabbix-sync:main
```
This should run a one-time sync, you can check the sync with
This should run a one-time sync. You can check the sync with
`docker logs netbox-zabbix-sync`.
The image uses the default `config.py` for it's configuration, you can use a
The image uses the default `config.py` for its configuration, you can use a
volume mount in the docker run command to override with your own config file if
needed (see [config file](#config-file)):
```sh
```bash
docker run -d -t -i -v $(pwd)/config.py:/opt/netbox-zabbix/config.py ...
```
@ -39,7 +38,7 @@ docker run -d -t -i -v $(pwd)/config.py:/opt/netbox-zabbix/config.py ...
### Cloning the repository
```sh
```bash
git clone https://github.com/TheNetworkGuy/netbox-zabbix-sync.git
```
@ -73,19 +72,19 @@ cp config.py.example config.py
Set the following environment variables:
```sh
export ZABBIX_HOST="https://zabbix.local"
export ZABBIX_USER="username"
export ZABBIX_PASS="Password"
export NETBOX_HOST="https://netbox.local"
export NETBOX_TOKEN="secrettoken"
```bash
ZABBIX_HOST="https://zabbix.local"
ZABBIX_USER="username"
ZABBIX_PASS="Password"
NETBOX_HOST="https://netbox.local"
NETBOX_TOKEN="secrettoken"
```
Or, you can use a Zabbix API token to login instead of using a username and
password. In that case `ZABBIX_USER` and `ZABBIX_PASS` will be ignored.
```sh
export ZABBIX_TOKEN=othersecrettoken
```bash
ZABBIX_TOKEN=othersecrettoken
```
If you are using custom SSL certificates for NetBox and/or Zabbix, you can set
@ -119,8 +118,8 @@ the template information then the zabbix_template field is not required):
You can make the `zabbix_hostid` field hidden or read-only to prevent human
intervention.
This is optional and there is a use case for leaving it read-write in the UI to
manually change the ID. For example to re-run a sync.
This is optional, but there may be cases where you want to leave it
read-write in the UI. For example to manually change or clear the ID and re-run a sync.
## Virtual Machine (VM) Syncing
@ -147,7 +146,7 @@ creation for devices in a new category. I would recommend setting this variable
to `True` since leaving it on `False` results in a lot of manual work.
The format can be set with the `hostgroup_format` variable for devices and
`vm_hostgroup_format` for devices.
`vm_hostgroup_format` for virtual machines.
Any nested parent hostgroups will also be created automatically. For instance
the region `Berlin` with parent region `Germany` will create the hostgroup
@ -182,19 +181,27 @@ used:
| ------------ | ------------------------ |
| location | The device location name |
| manufacturer | Device manufacturer name |
| rack | Rack |
**Only for VMs**
| name | description |
| ------------ | --------------- |
| cluster | VM cluster name |
| cluster_type | VM cluster type |
| device | parent device |
You can specify the value sperated by a "/" like so:
You can specify the value separated by a "/" like so:
```python
hostgroup_format = "tenant/site/location/role"
```
hostgroup_format = "tenant/site/dev_location/role"
```
You can also provice a list of groups like so:
```python
hostgroup_format = ["region/site_group/site", "role", "tenant_group/tenant"]
```
**Group traversal**
@ -239,8 +246,8 @@ have a relationship with a tenant.
- Device_role: PDU
- Site: HQ-AMS
```
hostgroup_format = "site/tenant/device_role"
```python
hostgroup_format = "site/tenant/role"
```
When running the script like above, the following hostgroup (HG) will be
@ -252,7 +259,7 @@ generated for both hosts:
The same logic applies to custom fields being used in the HG format:
```
```python
hostgroup_format = "site/mycustomfieldname"
```
@ -299,18 +306,21 @@ You can set the inventory mode to "disabled", "manual" or "automatic" with the
[Zabbix Manual](https://www.zabbix.com/documentation/current/en/manual/config/hosts/inventory#building-inventory)
for more information about the modes.
Use the `inventory_map` variable to map which NetBox properties are used in
Use the `device_inventory_map` variable to map which NetBox properties are used in
which Zabbix Inventory fields. For nested properties, you can use the '/'
seperator. For example, the following map will assign the custom field
'mycustomfield' to the 'alias' Zabbix inventory field:
```
For Virtual Machines, use `vm_inventory_map`.
```python
inventory_sync = True
inventory_mode = "manual"
inventory_map = { "custom_fields/mycustomfield/name": "alias"}
device_inventory_map = {"custom_fields/mycustomfield/name": "alias"}
vm_inventory_map = {"custom_fields/mycustomfield/name": "alias"}
```
See `config.py.example` for an extensive example map. Any Zabix Inventory fields
See `config.py.example` for an extensive example map. Any Zabbix Inventory fields
that are not included in the map will not be touched by the script, so you can
safely add manual values or use items to automatically add values to other
fields.
@ -328,14 +338,14 @@ sticking to the custom field.
You can change the behaviour in the config file. By default this setting is
false but you can set it to true to use config context:
```
```python
templates_config_context = True
```
After that make sure that for each host there is at least one template defined
in the config context in this format:
```
```json
{
"zabbix": {
"templates": [
@ -353,10 +363,196 @@ added benefit of overwriting the template should a device in NetBox have a
device specific context defined. In this case the device specific context
template(s) will take priority over the device type custom field template.
```
```python
templates_config_context_overrule = True
```
### Tags
This script can sync host tags to your Zabbix hosts for use in filtering,
SLA calculations and event correlation.
Tags can be synced from the following sources:
1. NetBox device/vm tags
2. NetBox config context
3. NetBox fields
Syncing tags will override any tags that were set manually on the host,
making NetBox the single source-of-truth for managing tags.
To enable syncing, turn on tag_sync in the config file.
By default, this script will modify tag names and tag values to lowercase.
You can change this behaviour by setting tag_lower to False.
```python
tag_sync = True
tag_lower = True
```
#### Device tags
As NetBox doesn't follow the tag/value pattern for tags, we will need a tag
name set to register the netbox tags.
By default the tag name is "NetBox", but you can change this to whatever you want.
The value for the tag can be set to 'name', 'display', or 'slug', which refers to the property of the NetBox tag object that will be used as the value in Zabbix.
```python
tag_name = 'NetBox'
tag_value = 'name'
```
#### Config context
You can supply custom tags via config context by adding the following:
```json
{
"zabbix": {
"tags": [
{
"MyTagName": "MyTagValue"
},
{
"environment": "production"
}
],
}
}
```
This will allow you to assign tags based on the config context rules.
#### NetBox Field
NetBox field can also be used as input for tags, just like inventory and usermacros.
To enable syncing from fields, make sure to configure a `device_tag_map` and/or a `vm_tag_map`.
```python
device_tag_map = {"site/name": "site",
"rack/name": "rack",
"platform/name": "target"}
vm_tag_map = {"site/name": "site",
"cluster/name": "cluster",
"platform/name": "target"}
```
To turn off field syncing, set the maps to empty dictionaries:
```python
device_tag_map = {}
vm_tag_map = {}
```
### Usermacros
You can choose to use NetBox as a source for Host usermacros by
enabling the following option in the configuration file:
```python
usermacro_sync = True
```
Please be advised that enabling this option will _clear_ any usermacros
manually set on the managed hosts and override them with the usermacros
from NetBox.
There are two NetBox sources that can be used to populate usermacros:
1. NetBox config context
2. NetBox fields
#### Config context
By defining a dictionary `usermacros` within the `zabbix` key in
config context, you can dynamically assign usermacro values based on
anything that you can target based on
[config contexts](https://netboxlabs.com/docs/netbox/en/stable/features/context-data/)
within NetBox.
Through this method, it is possible to define the following types of usermacros:
1. Text
2. Secret
3. Vault
The default macro type is text if no `type` and `value` have been set.
It is also possible to create usermacros with
[context](https://www.zabbix.com/documentation/7.0/en/manual/config/macros/user_macros_context).
Examples:
```json
{
"zabbix": {
"usermacros": {
"{$USER_MACRO}": "test value",
"{$CONTEXT_MACRO:\"test\"}": "test value",
"{$CONTEXT_REGEX_MACRO:regex:\".*\"}": "test value",
"{$SECRET_MACRO}": {
"type": "secret",
"value": "PaSsPhRaSe"
},
"{$VAULT_MACRO}": {
"type": "vault",
"value": "secret/vmware:password"
},
"{$USER_MACRO2}": {
"type": "text",
"value": "another test value"
}
}
}
}
```
Please be aware that secret usermacros are only synced _once_ by default.
This is the default behavior because Zabbix API won't return the value of
secrets so the script cannot compare the values with those set in NetBox.
If you update a secret usermacro value, just remove the value from the host
in Zabbix and the new value will be synced during the next run.
Alternatively, you can set the following option in the config file:
```python
usermacro_sync = "full"
```
This will force a full usermacro sync on every run on hosts that have secret usermacros set.
That way, you will know for sure the secret values are always up to date.
Keep in mind that NetBox will show your secrets in plain text.
If true secrecy is required, consider switching to
[vault](https://www.zabbix.com/documentation/current/en/manual/config/macros/secret_macros#vault-secret)
usermacros.
#### Netbox Fields
To use NetBox fields as a source for usermacros, you will need to set up usermacro maps
for devices and/or virtual machines in the configuration file.
This method only supports `text` type usermacros.
For example:
```python
usermacro_sync = True
device_usermacro_map = {"serial": "{$HW_SERIAL}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"}
vm_usermacro_map = {"memory": "{$TOTAL_MEMORY}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"}
```
## Permissions
### NetBox
@ -393,9 +589,11 @@ python3 netbox_zabbix_sync.py
### Flags
| Flag | Option | Description |
| ---- | ------- | ---------------------- |
| -v | verbose | Log with debugging on. |
| Flag | Option | Description |
| ---- | --------- | ------------------------------------- |
| -v | verbose | Log with info on. |
| -vv | debug | Log with debugging on. |
| -vvv | debug-all | Log with debugging on for all modules |
## Config context
@ -411,7 +609,7 @@ You can set the proxy for a device using the 'proxy' key in config context.
}
```
It is now posible to specify proxy groups with the introduction of Proxy groups
It is now possible to specify proxy groups with the introduction of Proxy groups
in Zabbix 7. Specifying a group in the config context on older Zabbix releases
will have no impact and the script will ignore the statement.
@ -424,9 +622,9 @@ will have no impact and the script will ignore the statement.
```
The script will prefer groups when specifying both a proxy and group. This is
done with the assumption that groups are more resiliant and HA ready, making it
done with the assumption that groups are more resilient and HA ready, making it
a more logical choice to use for proxy linkage. This also makes migrating from a
proxy to proxy group easier since the group take priority over the invidivual
proxy to proxy group easier since the group take priority over the individual
proxy.
```json
@ -440,13 +638,7 @@ proxy.
In the example above the host will use the group on Zabbix 7. On Zabbix 6 and
below the host will use the proxy. Zabbix 7 will use the proxy value when
ommiting the proxy_group value.
Because of the possible amount of destruction when setting up NetBox but
forgetting the proxy command, the sync works a bit different. By default
everything is synced except in a situation where the Zabbix host has a proxy
configured but nothing is configured in NetBox. To force deletion and a full
sync, set the `full_proxy_sync` variable in the config file.
omitting the proxy_group value.
### Set interface parameters within NetBox
@ -463,7 +655,7 @@ Due to Zabbix limitations of changing interface type with a linked template,
changing the interface type from within NetBox is not supported and the script
will generate an error.
For example when changing a SNMP interface to an Agent interface:
For example, when changing a SNMP interface to an Agent interface:
```
NetBox-Zabbix-sync - WARNING - Device: Interface OUT of sync.
@ -471,11 +663,11 @@ NetBox-Zabbix-sync - ERROR - Device: changing interface type to 1 is not support
```
To configure the interface parameters you'll need to use custom context. Custom
context was used to make this script as customizable as posible for each
context was used to make this script as customizable as possible for each
environment. For example, you could:
- Set the custom context directly on a device
- Set the custom context on a label, which you would add to a device (for
- Set the custom context on a tag, which you would add to a device (for
instance, SNMPv3)
- Set the custom context on a device role
- Set the custom context on a site or region
@ -525,9 +717,13 @@ environment. For example, you could:
}
```
I would recommend using macros for sensitive data such as community strings
I would recommend using usermacros for sensitive data such as community strings
since the data in NetBox is plain-text.
> **_NOTE:_** Not all SNMP data is required for a working configuration.
> [The following parameters are allowed](https://www.zabbix.com/documentation/current/manual/api/reference/hostinterface/object#details_tag "The following parameters are allowed")but
> [The following parameters are allowed](https://www.zabbix.com/documentation/current/manual/api/reference/hostinterface/object#details_tag "The following parameters are allowed") but
> are not all required, depending on your environment.

View File

@ -80,19 +80,74 @@ inventory_sync = False
# For nested properties, you can use the '/' seperator.
# For example, the following map will assign the custom field 'mycustomfield' to the 'alias' Zabbix inventory field:
#
# inventory_map = { "custom_fields/mycustomfield/name": "alias"}
# device_inventory_map = { "custom_fields/mycustomfield/name": "alias"}
#
# The following map should provide some nice defaults:
inventory_map = { "asset_tag": "asset_tag",
"virtual_chassis/name": "chassis",
"status/label": "deployment_status",
"location/name": "location",
"latitude": "location_lat",
"longitude": "location_lon",
"comments": "notes",
"name": "name",
"rack/name": "site_rack",
"serial": "serialno_a",
"device_type/model": "type",
"device_type/manufacturer/name": "vendor",
"oob_ip/address": "oob_ip" }
# The following maps should provide some nice defaults:
device_inventory_map = { "asset_tag": "asset_tag",
"virtual_chassis/name": "chassis",
"status/label": "deployment_status",
"location/name": "location",
"latitude": "location_lat",
"longitude": "location_lon",
"comments": "notes",
"name": "name",
"rack/name": "site_rack",
"serial": "serialno_a",
"device_type/model": "type",
"device_type/manufacturer/name": "vendor",
"oob_ip/address": "oob_ip" }
# We also support inventory mapping on Virtual Machines.
vm_inventory_map = { "status/label": "deployment_status",
"comments": "notes",
"name": "name" }
# To allow syncing of usermacros from NetBox, set to True.
# this will enable both field mapping and config context usermacros.
#
# If set to "full", it will force the update of secret usermacros every run.
# Please see the README.md for more information.
usermacro_sync = False
# device usermacro_map to map NetBox fields to usermacros.
device_usermacro_map = {"serial": "{$HW_SERIAL}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"}
# virtual machine usermacro_map to map NetBox fields to usermacros.
vm_usermacro_map = {"memory": "{$TOTAL_MEMORY}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"}
# To sync host tags to Zabbix, set to True.
tag_sync = False
# Setting tag_lower to True will lower capital letters ain tag names and values
# This is more inline with the Zabbix way of working with tags.
#
# You can however set this to False to ensure capital letters are synced to Zabbix tags.
tag_lower = True
# We can sync NetBox device/VM tags to Zabbix, but as NetBox tags don't follow the key/value
# pattern, we need to specify a tag name to register the NetBox tags in Zabbix.
#
#
#
# If tag_name is set to False, we won't sync NetBox device/VM tags to Zabbix.
tag_name = 'NetBox'
# We can choose to use 'name', 'slug' or 'display' NetBox tag properties as a value in Zabbix.
# 'name'is used by default.
tag_value = "name"
# device tag_map to map NetBox fields to host tags.
device_tag_map = {"site/name": "site",
"rack/name": "rack",
"platform/name": "target"}
# Virtual machine tag_map to map NetBox fields to host tags.
vm_tag_map = {"site/name": "site",
"cluster/name": "cluster",
"platform/name": "target"}

121
modules/config.py Normal file
View File

@ -0,0 +1,121 @@
"""
Module for parsing configuration from the top level config.py file
"""
from pathlib import Path
from importlib import util
from os import environ
from logging import getLogger
logger = getLogger(__name__)
# PLEASE NOTE: This is a sample config file. Please do NOT make any edits in this file!
# You should create your own config.py and it will overwrite the default config.
DEFAULT_CONFIG = {
"templates_config_context": False,
"templates_config_context_overrule": False,
"template_cf": "zabbix_template",
"device_cf": "zabbix_hostid",
"clustering": False,
"create_hostgroups": True,
"create_journal": False,
"sync_vms": False,
"vm_hostgroup_format": "cluster_type/cluster/role",
"full_proxy_sync": False,
"zabbix_device_removal": ["Decommissioning", "Inventory"],
"zabbix_device_disable": ["Offline", "Planned", "Staged", "Failed"],
"hostgroup_format": "site/manufacturer/role",
"traverse_regions": False,
"traverse_site_groups": False,
"nb_device_filter": {"name__n": "null"},
"nb_vm_filter": {"name__n": "null"},
"inventory_mode": "disabled",
"inventory_sync": False,
"device_inventory_map": {
"asset_tag": "asset_tag",
"virtual_chassis/name": "chassis",
"status/label": "deployment_status",
"location/name": "location",
"latitude": "location_lat",
"longitude": "location_lon",
"comments": "notes",
"name": "name",
"rack/name": "site_rack",
"serial": "serialno_a",
"device_type/model": "type",
"device_type/manufacturer/name": "vendor",
"oob_ip/address": "oob_ip"
},
"vm_inventory_map": {
"status/label": "deployment_status",
"comments": "notes",
"name": "name"
},
"usermacro_sync": False,
"device_usermacro_map": {
"serial": "{$HW_SERIAL}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"
},
"vm_usermacro_map": {
"memory": "{$TOTAL_MEMORY}",
"role/name": "{$DEV_ROLE}",
"url": "{$NB_URL}",
"id": "{$NB_ID}"
},
"tag_sync": False,
"tag_lower": True,
"tag_name": 'NetBox',
"tag_value": "name",
"device_tag_map": {
"site/name": "site",
"rack/name": "rack",
"platform/name": "target"
},
"vm_tag_map": {
"site/name": "site",
"cluster/name": "cluster",
"platform/name": "target"
}
}
def load_config():
"""Returns combined config from all sources"""
# Overwrite default config with config.py
conf = load_config_file(config_default=DEFAULT_CONFIG)
# Overwrite default config and config.py with environment variables
for key in conf:
value_setting = load_env_variable(key)
if value_setting is not None:
conf[key] = value_setting
return conf
def load_env_variable(config_environvar):
"""Returns config from environment variable"""
prefix = "NBZX_"
config_environvar = prefix + config_environvar.upper()
if config_environvar in environ:
return environ[config_environvar]
return None
def load_config_file(config_default, config_file="config.py"):
"""Returns config from config.py file"""
# Check if config.py exists and load it
# If it does not exist, return the default config
config_path = Path(config_file)
if config_path.exists():
dconf = config_default.copy()
# Dynamically import the config module
spec = util.spec_from_file_location("config", config_path)
config_module = util.module_from_spec(spec)
spec.loader.exec_module(config_module)
# Update DEFAULT_CONFIG with variables from the config module
for key in dconf:
if hasattr(config_module, key):
dconf[key] = getattr(config_module, key)
return dconf
return config_default

View File

@ -1,38 +1,41 @@
#!/usr/bin/env python3
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation, too-many-lines
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation, too-many-lines, too-many-public-methods, duplicate-code
"""
Device specific handeling for NetBox to Zabbix
"""
from os import sys
from re import search
from logging import getLogger
from zabbix_utils import APIRequestError
from modules.exceptions import (SyncInventoryError, TemplateError, SyncExternalError,
InterfaceConfigError, JournalError)
from modules.interface import ZabbixInterface
from modules.hostgroups import Hostgroup
try:
from config import (
template_cf, device_cf,
traverse_site_groups,
traverse_regions,
inventory_sync,
inventory_mode,
inventory_map
)
except ModuleNotFoundError:
print("Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py.")
sys.exit(0)
class PhysicalDevice():
from copy import deepcopy
from logging import getLogger
from re import search
from operator import itemgetter
from zabbix_utils import APIRequestError
from pynetbox import RequestError as NetboxRequestError
from modules.exceptions import (
InterfaceConfigError,
SyncExternalError,
SyncInventoryError,
TemplateError,
)
from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface
from modules.tags import ZabbixTags
from modules.tools import field_mapper, remove_duplicates, sanatize_log_output
from modules.usermacros import ZabbixUsermacros
from modules.config import load_config
config = load_config()
class PhysicalDevice:
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments
"""
Represents Network device.
INPUT: (NetBox device class, ZabbixAPI class, journal flag, NB journal class)
"""
def __init__(self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None):
def __init__(
self, nb, zabbix, nb_journal_class, nb_version, journal=None, logger=None
):
self.nb = nb
self.id = nb.id
self.name = nb.name
@ -40,11 +43,11 @@ class PhysicalDevice():
self.status = nb.status.label
self.zabbix = zabbix
self.zabbix_id = None
self.group_id = None
self.group_ids = []
self.nb_api_version = nb_version
self.zbx_template_names = []
self.zbx_templates = []
self.hostgroup = None
self.hostgroups = []
self.tenant = nb.tenant
self.config_context = nb.config_context
self.zbxproxy = None
@ -53,6 +56,8 @@ class PhysicalDevice():
self.nb_journals = nb_journal_class
self.inventory_mode = -1
self.inventory = {}
self.usermacros = []
self.tags = {}
self.logger = logger if logger else getLogger(__name__)
self._setBasics()
@ -62,6 +67,18 @@ class PhysicalDevice():
def __str__(self):
return self.__repr__()
def _inventory_map(self):
"""Use device inventory maps"""
return config["device_inventory_map"]
def _usermacro_map(self):
"""Use device inventory maps"""
return config["device_usermacro_map"]
def _tag_map(self):
"""Use device host tag maps"""
return config["device_tag_map"]
def _setBasics(self):
"""
Sets basic information like IP address.
@ -72,44 +89,55 @@ class PhysicalDevice():
self.ip = self.cidr.split("/")[0]
else:
e = f"Host {self.name}: no primary IP."
self.logger.info(e)
self.logger.warning(e)
raise SyncInventoryError(e)
# Check if device has custom field for ZBX ID
if device_cf in self.nb.custom_fields:
self.zabbix_id = self.nb.custom_fields[device_cf]
if config["device_cf"] in self.nb.custom_fields:
self.zabbix_id = self.nb.custom_fields[config["device_cf"]]
else:
e = f"Host {self.name}: Custom field {device_cf} not present"
e = f'Host {self.name}: Custom field {config["device_cf"]} not present'
self.logger.warning(e)
raise SyncInventoryError(e)
# Validate hostname format.
odd_character_list = ["ä", "ö", "ü", "Ä", "Ö", "Ü", "ß"]
self.use_visible_name = False
if (any(letter in self.name for letter in odd_character_list) or
bool(search('[\u0400-\u04FF]', self.name))):
if any(letter in self.name for letter in odd_character_list) or bool(
search("[\u0400-\u04ff]", self.name)
):
self.name = f"NETBOX_ID{self.id}"
self.visible_name = self.nb.name
self.use_visible_name = True
self.logger.info(f"Host {self.visible_name} contains special characters. "
f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix.")
self.logger.info(
f"Host {self.visible_name} contains special characters. "
f"Using {self.name} as name for the NetBox object "
f"and using {self.visible_name} as visible name in Zabbix."
)
else:
pass
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device"""
# Create new Hostgroup instance
hg = Hostgroup("dev", self.nb, self.nb_api_version, logger=self.logger,
nested_sitegroup_flag=traverse_site_groups,
nested_region_flag=traverse_regions,
nb_groups=nb_site_groups,
nb_regions=nb_regions)
hg = Hostgroup(
"dev",
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config['traverse_site_groups'],
nested_region_flag=config['traverse_regions'],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format
self.hostgroup = hg.generate(hg_format)
if isinstance(hg_format, list):
self.hostgroups = [hg.generate(f) for f in hg_format]
else:
self.hostgroups.append(hg.generate(hg_format))
def set_template(self, prefer_config_context, overrule_custom):
""" Set Template """
"""Set Template"""
self.zbx_template_names = None
# Gather templates ONLY from the device specific context
if prefer_config_context:
@ -133,28 +161,37 @@ class PhysicalDevice():
return True
def get_templates_cf(self):
""" Get template from custom field """
"""Get template from custom field"""
# Get Zabbix templates from the device type
device_type_cfs = self.nb.device_type.custom_fields
# Check if the ZBX Template CF is present
if template_cf in device_type_cfs:
if config["template_cf"] in device_type_cfs:
# Set value to template
return [device_type_cfs[template_cf]]
return [device_type_cfs[config["template_cf"]]]
# Custom field not found, return error
e = (f"Custom field {template_cf} not "
e = (
f"Custom field {config['template_cf']} not "
f"found for {self.nb.device_type.manufacturer.name}"
f" - {self.nb.device_type.display}.")
f" - {self.nb.device_type.display}."
)
self.logger.warning(e)
raise TemplateError(e)
def get_templates_context(self):
""" Get Zabbix templates from the device context """
"""Get Zabbix templates from the device context"""
if "zabbix" not in self.config_context:
e = (f"Host {self.name}: Key 'zabbix' not found in config "
"context for template lookup")
e = (
f"Host {self.name}: Key 'zabbix' not found in config "
"context for template lookup"
)
raise TemplateError(e)
if "templates" not in self.config_context["zabbix"]:
e = (f"Host {self.name}: Key 'templates' not found in config "
"context 'zabbix' for template lookup")
e = (
f"Host {self.name}: Key 'templates' not found in config "
"context 'zabbix' for template lookup"
)
raise TemplateError(e)
# Check if format is list or string.
if isinstance(self.config_context["zabbix"]["templates"], str):
@ -162,49 +199,30 @@ class PhysicalDevice():
return self.config_context["zabbix"]["templates"]
def set_inventory(self, nbdevice):
""" Set host inventory """
"""Set host inventory"""
# Set inventory mode. Default is disabled (see class init function).
if inventory_mode == "disabled":
if inventory_sync:
if config["inventory_mode"] == "disabled":
if config["inventory_sync"]:
self.logger.error(f"Host {self.name}: Unable to map NetBox inventory to Zabbix. "
"Inventory sync is enabled in config but inventory mode is disabled.")
"Inventory sync is enabled in "
"config but inventory mode is disabled.")
return True
if inventory_mode == "manual":
if config["inventory_mode"] == "manual":
self.inventory_mode = 0
elif inventory_mode == "automatic":
elif config["inventory_mode"] == "automatic":
self.inventory_mode = 1
else:
self.logger.error(f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {inventory_mode}")
self.logger.error(
f"Host {self.name}: Specified value for inventory mode in"
f" config is not valid. Got value {config['inventory_mode']}"
)
return False
self.inventory = {}
if inventory_sync and self.inventory_mode in [0,1]:
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
self.logger.debug(f"Host {self.name}: Starting inventory mapper")
# Let's build an inventory dict for each property in the inventory_map
for nb_inv_field, zbx_inv_field in inventory_map.items():
field_list = nb_inv_field.split("/") # convert str to list based on delimiter
# start at the base of the dict...
value = nbdevice
# ... and step through the dict till we find the needed value
for item in field_list:
value = value[item] if value else None
# Check if the result is usable and expected
# We want to apply any int or float 0 values,
# even if python thinks those are empty.
if ((value and isinstance(value, int | float | str )) or
(isinstance(value, int | float) and int(value) ==0)):
self.inventory[zbx_inv_field] = str(value)
elif not value:
# empty value should just be an empty string for API compatibility
self.logger.debug(f"Host {self.name}: NetBox inventory lookup for "
f"'{nb_inv_field}' returned an empty value")
self.inventory[zbx_inv_field] = ""
else:
# Value is not a string or numeral, probably not what the user expected.
self.logger.error(f"Host {self.name}: Inventory lookup for '{nb_inv_field}'"
" returned an unexpected type: it will be skipped.")
self.logger.debug(f"Host {self.name}: Inventory mapping complete. "
f"Mapped {len(list(filter(None, self.inventory.values())))} field(s)")
self.inventory = field_mapper(
self.name, self._inventory_map(), nbdevice, self.logger
)
return True
def isCluster(self):
@ -218,13 +236,17 @@ class PhysicalDevice():
Returns chassis master ID.
"""
if not self.isCluster():
e = (f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster.")
e = (
f"Unable to proces {self.name} for cluster calculation: "
f"not part of a cluster."
)
self.logger.warning(e)
raise SyncInventoryError(e)
if not self.nb.virtual_chassis.master:
e = (f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason.")
e = (
f"{self.name} is part of a NetBox virtual chassis which does "
"not have a master configured. Skipping for this reason."
)
self.logger.error(e)
raise SyncInventoryError(e)
return self.nb.virtual_chassis.master.id
@ -237,9 +259,11 @@ class PhysicalDevice():
"""
masterid = self.getClusterMaster()
if masterid == self.id:
self.logger.debug(f"Host {self.name} is primary cluster member. "
f"Modifying hostname from {self.name} to " +
f"{self.nb.virtual_chassis.name}.")
self.logger.debug(
f"Host {self.name} is primary cluster member. "
f"Modifying hostname from {self.name} to "
+ f"{self.nb.virtual_chassis.name}."
)
self.name = self.nb.virtual_chassis.name
return True
self.logger.debug(f"Host {self.name} is non-primary cluster member.")
@ -264,18 +288,24 @@ class PhysicalDevice():
# Go through all templates found in Zabbix
for zbx_template in templates:
# If the template names match
if zbx_template['name'] == nb_template:
if zbx_template["name"] == nb_template:
# Set match variable to true, add template details
# to class variable and return debug log
template_match = True
self.zbx_templates.append({"templateid": zbx_template['templateid'],
"name": zbx_template['name']})
self.zbx_templates.append(
{
"templateid": zbx_template["templateid"],
"name": zbx_template["name"],
}
)
e = f"Host {self.name}: found template {zbx_template['name']}"
self.logger.debug(e)
# Return error should the template not be found in Zabbix
if not template_match:
e = (f"Unable to find template {nb_template} "
f"for host {self.name} in Zabbix. Skipping host...")
e = (
f"Unable to find template {nb_template} "
f"for host {self.name} in Zabbix. Skipping host..."
)
self.logger.warning(e)
raise SyncInventoryError(e)
@ -286,12 +316,17 @@ class PhysicalDevice():
OUTPUT: True / False
"""
# Go through all groups
for group in groups:
if group['name'] == self.hostgroup:
self.group_id = group['groupid']
e = f"Host {self.name}: matched group {group['name']}"
self.logger.debug(e)
return True
for hg in self.hostgroups:
for group in groups:
if group["name"] == hg:
self.group_ids.append({"groupid": group["groupid"]})
e = (
f"Host {self.name}: matched group "
f"\"{group['name']}\" (ID:{group['groupid']})"
)
self.logger.debug(e)
if len(self.group_ids) == len(self.hostgroups):
return True
return False
def cleanup(self):
@ -302,10 +337,13 @@ class PhysicalDevice():
if self.zabbix_id:
try:
# Check if the Zabbix host exists in Zabbix
zbx_host = bool(self.zabbix.host.get(filter={'hostid': self.zabbix_id},
output=[]))
e = (f"Host {self.name}: was already deleted from Zabbix."
" Removed link in NetBox.")
zbx_host = bool(
self.zabbix.host.get(filter={"hostid": self.zabbix_id}, output=[])
)
e = (
f"Host {self.name}: was already deleted from Zabbix."
" Removed link in NetBox."
)
if zbx_host:
# Delete host should it exists
self.zabbix.host.delete(self.zabbix_id)
@ -321,7 +359,7 @@ class PhysicalDevice():
def _zeroize_cf(self):
"""Sets the hostID custom field in NetBox to zero,
effectively destroying the link"""
self.nb.custom_fields[device_cf] = None
self.nb.custom_fields[config["device_cf"]] = None
self.nb.save()
def _zabbixHostnameExists(self):
@ -330,9 +368,9 @@ class PhysicalDevice():
"""
# Validate the hostname or visible name field
if not self.use_visible_name:
zbx_filter = {'host': self.name}
zbx_filter = {"host": self.name}
else:
zbx_filter = {'name': self.visible_name}
zbx_filter = {"name": self.visible_name}
host = self.zabbix.host.get(filter=zbx_filter, output=[])
return bool(host)
@ -358,6 +396,44 @@ class PhysicalDevice():
self.logger.warning(message)
raise SyncInventoryError(message) from e
def set_usermacros(self):
"""
Generates Usermacros
"""
macros = ZabbixUsermacros(
self.nb,
self._usermacro_map(),
config['usermacro_sync'],
logger=self.logger,
host=self.name,
)
if macros.sync is False:
self.usermacros = []
return True
self.usermacros = macros.generate()
return True
def set_tags(self):
"""
Generates Host Tags
"""
tags = ZabbixTags(
self.nb,
self._tag_map(),
config['tag_sync'],
config['tag_lower'],
tag_name=config['tag_name'],
tag_value=config['tag_value'],
logger=self.logger,
host=self.name,
)
if tags.sync is False:
self.tags = []
self.tags = tags.generate()
return True
def setProxy(self, proxy_list):
"""
Sets proxy or proxy group if this
@ -366,16 +442,18 @@ class PhysicalDevice():
input: List of all proxies and proxy groups in standardized format
"""
# check if the key Zabbix is defined in the config context
if not "zabbix" in self.nb.config_context:
if "zabbix" not in self.nb.config_context:
return False
if ("proxy" in self.nb.config_context["zabbix"] and
not self.nb.config_context["zabbix"]["proxy"]):
if (
"proxy" in self.nb.config_context["zabbix"]
and not self.nb.config_context["zabbix"]["proxy"]
):
return False
# Proxy group takes priority over a proxy due
# to it being HA and therefore being more reliable
# Includes proxy group fix since Zabbix <= 6 should ignore this
proxy_types = ["proxy"]
if str(self.zabbix.version).startswith('7'):
if str(self.zabbix.version).startswith("7"):
# Only insert groups in front of list for Zabbix7
proxy_types.insert(0, "proxy_group")
for proxy_type in proxy_types:
@ -389,15 +467,23 @@ class PhysicalDevice():
continue
# If the proxy name matches
if proxy["name"] == proxy_name:
self.logger.debug(f"Host {self.name}: using {proxy['type']}"
f" {proxy_name}")
self.logger.debug(
f"Host {self.name}: using {proxy['type']}" f" {proxy_name}"
)
self.zbxproxy = proxy
return True
self.logger.warning(f"Host {self.name}: unable to find proxy {proxy_name}")
self.logger.warning(
f"Host {self.name}: unable to find proxy {proxy_name}"
)
return False
def createInZabbix(self, groups, templates, proxies,
description="Host added by NetBox sync script."):
def createInZabbix(
self,
groups,
templates,
proxies,
description="Host added by NetBox sync script.",
):
"""
Creates Zabbix host object with parameters from NetBox object.
"""
@ -405,35 +491,40 @@ class PhysicalDevice():
if not self._zabbixHostnameExists():
# Set group and template ID's for host
if not self.setZabbixGroupID(groups):
e = (f"Unable to find group '{self.hostgroup}' "
f"for host {self.name} in Zabbix.")
e = (
f"Unable to find group '{self.hostgroup}' "
f"for host {self.name} in Zabbix."
)
self.logger.warning(e)
raise SyncInventoryError(e)
self.zbxTemplatePrepper(templates)
templateids = []
for template in self.zbx_templates:
templateids.append({'templateid': template['templateid']})
templateids.append({"templateid": template["templateid"]})
# Set interface, group and template configuration
interfaces = self.setInterfaceDetails()
groups = [{"groupid": self.group_id}]
groups = self.group_ids
# Set Zabbix proxy if defined
self.setProxy(proxies)
# Set basic data for host creation
create_data = {"host": self.name,
"name": self.visible_name,
"status": self.zabbix_state,
"interfaces": interfaces,
"groups": groups,
"templates": templateids,
"description": description,
"inventory_mode": self.inventory_mode,
"inventory": self.inventory
}
create_data = {
"host": self.name,
"name": self.visible_name,
"status": self.zabbix_state,
"interfaces": interfaces,
"groups": groups,
"templates": templateids,
"description": description,
"inventory_mode": self.inventory_mode,
"inventory": self.inventory,
"macros": self.usermacros,
"tags": self.tags,
}
# If a Zabbix proxy or Zabbix Proxy group has been defined
if self.zbxproxy:
# If a lower version than 7 is used, we can assume that
# the proxy is a normal proxy and not a proxy group
if not str(self.zabbix.version).startswith('7'):
if not str(self.zabbix.version).startswith("7"):
create_data["proxy_hostid"] = self.zbxproxy["id"]
else:
# Configure either a proxy or proxy group
@ -444,18 +535,19 @@ class PhysicalDevice():
host = self.zabbix.host.create(**create_data)
self.zabbix_id = host["hostids"][0]
except APIRequestError as e:
e = f"Host {self.name}: Couldn't create. Zabbix returned {str(e)}."
self.logger.error(e)
raise SyncExternalError(e) from None
msg = f"Host {self.name}: Couldn't create. Zabbix returned {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
# Set NetBox custom field to hostID value.
self.nb.custom_fields[device_cf] = int(self.zabbix_id)
self.nb.custom_fields[config["device_cf"]] = int(self.zabbix_id)
self.nb.save()
msg = f"Host {self.name}: Created host in Zabbix."
self.logger.info(msg)
self.create_journal_entry("success", msg)
else:
e = f"Host {self.name}: Unable to add to Zabbix. Host already present."
self.logger.warning(e)
self.logger.error(
f"Host {self.name}: Unable to add to Zabbix. Host already present."
)
def createZabbixHostgroup(self, hostgroups):
"""
@ -464,23 +556,26 @@ class PhysicalDevice():
"""
final_data = []
# Check if the hostgroup is in a nested format and check each parent
for pos in range(len(self.hostgroup.split('/'))):
zabbix_hg = self.hostgroup.rsplit('/', pos)[0]
if self.lookupZabbixHostgroup(hostgroups, zabbix_hg):
# Hostgroup already exists
continue
# Create new group
try:
# API call to Zabbix
groupid = self.zabbix.hostgroup.create(name=zabbix_hg)
e = f"Hostgroup '{zabbix_hg}': created in Zabbix."
self.logger.info(e)
# Add group to final data
final_data.append({'groupid': groupid["groupids"][0], 'name': zabbix_hg})
except APIRequestError as e:
msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
for hostgroup in self.hostgroups:
for pos in range(len(hostgroup.split("/"))):
zabbix_hg = hostgroup.rsplit("/", pos)[0]
if self.lookupZabbixHostgroup(hostgroups, zabbix_hg):
# Hostgroup already exists
continue
# Create new group
try:
# API call to Zabbix
groupid = self.zabbix.hostgroup.create(name=zabbix_hg)
e = f"Hostgroup '{zabbix_hg}': created in Zabbix."
self.logger.info(e)
# Add group to final data
final_data.append(
{"groupid": groupid["groupids"][0], "name": zabbix_hg}
)
except APIRequestError as e:
msg = f"Hostgroup '{zabbix_hg}': unable to create. Zabbix returned {str(e)}."
self.logger.error(msg)
raise SyncExternalError(msg) from e
return final_data
def lookupZabbixHostgroup(self, group_list, lookup_group):
@ -503,20 +598,24 @@ class PhysicalDevice():
try:
self.zabbix.host.update(hostid=self.zabbix_id, **kwargs)
except APIRequestError as e:
e = (f"Host {self.name}: Unable to update. "
f"Zabbix returned the following error: {str(e)}.")
e = (
f"Host {self.name}: Unable to update. "
f"Zabbix returned the following error: {str(e)}."
)
self.logger.error(e)
raise SyncExternalError(e) from None
self.logger.info(f"Updated host {self.name} with data {kwargs}.")
self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.")
self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.")
def ConsistencyCheck(self, groups, templates, proxies, proxy_power, create_hostgroups):
def ConsistencyCheck(
self, groups, templates, proxies, proxy_power, create_hostgroups
):
# pylint: disable=too-many-branches, too-many-statements
"""
Checks if Zabbix object is still valid with NetBox parameters.
"""
# If group is found or if the hostgroup is nested
if not self.setZabbixGroupID(groups) or len(self.hostgroup.split('/')) > 1:
if not self.setZabbixGroupID(groups): # or len(self.hostgroups.split("/")) > 1:
if create_hostgroups:
# Script is allowed to create a new hostgroup
new_groups = self.createZabbixHostgroup(groups)
@ -524,50 +623,66 @@ class PhysicalDevice():
# Add all new groups to the list of groups
groups.append(group)
# check if the initial group was not already found (and this is a nested folder check)
if not self.group_id:
if not self.group_ids:
# Function returns true / false but also sets GroupID
if not self.setZabbixGroupID(groups) and not create_hostgroups:
e = (f"Host {self.name}: different hostgroup is required but "
"unable to create hostgroup without generation permission.")
e = (
f"Host {self.name}: different hostgroup is required but "
"unable to create hostgroup without generation permission."
)
self.logger.warning(e)
raise SyncInventoryError(e)
#if self.group_ids:
# self.group_ids.append(self.pri_group_id)
# Prepare templates and proxy config
self.zbxTemplatePrepper(templates)
self.setProxy(proxies)
# Get host object from Zabbix
host = self.zabbix.host.get(filter={'hostid': self.zabbix_id},
selectInterfaces=['type', 'ip',
'port', 'details',
'interfaceid'],
selectGroups=["groupid"],
selectHostGroups=["groupid"],
selectParentTemplates=["templateid"],
selectInventory=list(inventory_map.values()))
host = self.zabbix.host.get(
filter={"hostid": self.zabbix_id},
selectInterfaces=["type", "ip", "port", "details", "interfaceid"],
selectGroups=["groupid"],
selectHostGroups=["groupid"],
selectParentTemplates=["templateid"],
selectInventory=list(self._inventory_map().values()),
selectMacros=["macro", "value", "type", "description"],
selectTags=["tag", "value"],
)
if len(host) > 1:
e = (f"Got {len(host)} results for Zabbix hosts "
f"with ID {self.zabbix_id} - hostname {self.name}.")
e = (
f"Got {len(host)} results for Zabbix hosts "
f"with ID {self.zabbix_id} - hostname {self.name}."
)
self.logger.error(e)
raise SyncInventoryError(e)
if len(host) == 0:
e = (f"Host {self.name}: No Zabbix host found. "
f"This is likely the result of a deleted Zabbix host "
f"without zeroing the ID field in NetBox.")
e = (
f"Host {self.name}: No Zabbix host found. "
f"This is likely the result of a deleted Zabbix host "
f"without zeroing the ID field in NetBox."
)
self.logger.error(e)
raise SyncInventoryError(e)
host = host[0]
if host["host"] == self.name:
self.logger.debug(f"Host {self.name}: hostname in-sync.")
else:
self.logger.warning(f"Host {self.name}: hostname OUT of sync. "
f"Received value: {host['host']}")
self.logger.warning(
f"Host {self.name}: hostname OUT of sync. "
f"Received value: {host['host']}"
)
self.updateZabbixHost(host=self.name)
# Execute check depending on wether the name is special or not
if self.use_visible_name:
if host["name"] == self.visible_name:
self.logger.debug(f"Host {self.name}: visible name in-sync.")
else:
self.logger.warning(f"Host {self.name}: visible name OUT of sync."
f" Received value: {host['name']}")
self.logger.warning(
f"Host {self.name}: visible name OUT of sync."
f" Received value: {host['name']}"
)
self.updateZabbixHost(name=self.visible_name)
# Check if the templates are in-sync
@ -576,35 +691,37 @@ class PhysicalDevice():
# Prepare Templates for API parsing
templateids = []
for template in self.zbx_templates:
templateids.append({'templateid': template['templateid']})
templateids.append({"templateid": template["templateid"]})
# Update Zabbix with NB templates and clear any old / lost templates
self.updateZabbixHost(templates_clear=host["parentTemplates"],
templates=templateids)
self.updateZabbixHost(
templates_clear=host["parentTemplates"], templates=templateids
)
else:
self.logger.debug(f"Host {self.name}: template(s) in-sync.")
# Check if Zabbix version is 6 or higher. Issue #93
group_dictname = "hostgroups"
if str(self.zabbix.version).startswith(('6', '5')):
if str(self.zabbix.version).startswith(("6", "5")):
group_dictname = "groups"
for group in host[group_dictname]:
if group["groupid"] == self.group_id:
self.logger.debug(f"Host {self.name}: hostgroup in-sync.")
break
# Check if hostgroups match
if (sorted(host[group_dictname], key=itemgetter('groupid')) ==
sorted(self.group_ids, key=itemgetter('groupid'))):
self.logger.debug(f"Host {self.name}: hostgroups in-sync.")
else:
self.logger.warning(f"Host {self.name}: hostgroup OUT of sync.")
self.updateZabbixHost(groups={'groupid': self.group_id})
self.logger.warning(f"Host {self.name}: hostgroups OUT of sync.")
self.updateZabbixHost(groups=self.group_ids)
if int(host["status"]) == self.zabbix_state:
self.logger.debug(f"Host {self.name}: status in-sync.")
else:
self.logger.warning(f"Host {self.name}: status OUT of sync.")
self.updateZabbixHost(status=str(self.zabbix_state))
# Check if a proxy has been defined
if self.zbxproxy:
# Check if proxy or proxy group is defined
if (self.zbxproxy["idtype"] in host and
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]):
host[self.zbxproxy["idtype"]] == self.zbxproxy["id"]):
self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Backwards compatibility for Zabbix <= 6
elif "proxy_hostid" in host and host["proxy_hostid"] == self.zbxproxy["id"]:
@ -613,13 +730,15 @@ class PhysicalDevice():
else:
self.logger.warning(f"Host {self.name}: proxy OUT of sync.")
# Zabbix <= 6 patch
if not str(self.zabbix.version).startswith('7'):
self.updateZabbixHost(proxy_hostid=self.zbxproxy['id'])
if not str(self.zabbix.version).startswith("7"):
self.updateZabbixHost(proxy_hostid=self.zbxproxy["id"])
# Zabbix 7+
else:
# Prepare data structure for updating either proxy or group
update_data = {self.zbxproxy["idtype"]: self.zbxproxy["id"],
"monitored_by": self.zbxproxy['monitored_by']}
update_data = {
self.zbxproxy["idtype"]: self.zbxproxy["id"],
"monitored_by": self.zbxproxy["monitored_by"],
}
self.updateZabbixHost(**update_data)
else:
# No proxy is defined in NetBox
@ -631,8 +750,10 @@ class PhysicalDevice():
proxy_set = True
if proxy_power and proxy_set:
# Zabbix <= 6 fix
self.logger.warning(f"Host {self.name}: no proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix")
self.logger.warning(
f"Host {self.name}: no proxy is configured in NetBox "
"but is configured in Zabbix. Removing proxy config in Zabbix"
)
if "proxy_hostid" in host and bool(host["proxy_hostid"]):
self.updateZabbixHost(proxy_hostid=0)
# Zabbix 7 proxy
@ -644,29 +765,65 @@ class PhysicalDevice():
# Checks if a proxy has been defined in Zabbix and if proxy_power config has been set
if proxy_set and not proxy_power:
# Display error message
self.logger.error(f"Host {self.name} is configured "
f"with proxy in Zabbix but not in NetBox. The"
" -p flag was ommited: no "
"changes have been made.")
self.logger.error(
f"Host {self.name} is configured "
f"with proxy in Zabbix but not in NetBox. The"
" -p flag was ommited: no "
"changes have been made."
)
if not proxy_set:
self.logger.debug(f"Host {self.name}: proxy in-sync.")
# Check host inventory mode
if str(host['inventory_mode']) == str(self.inventory_mode):
if str(host["inventory_mode"]) == str(self.inventory_mode):
self.logger.debug(f"Host {self.name}: inventory_mode in-sync.")
else:
self.logger.warning(f"Host {self.name}: inventory_mode OUT of sync.")
self.updateZabbixHost(inventory_mode=str(self.inventory_mode))
if inventory_sync and self.inventory_mode in [0,1]:
if config["inventory_sync"] and self.inventory_mode in [0, 1]:
# Check host inventory mapping
if host['inventory'] == self.inventory:
if host["inventory"] == self.inventory:
self.logger.debug(f"Host {self.name}: inventory in-sync.")
else:
self.logger.warning(f"Host {self.name}: inventory OUT of sync.")
self.updateZabbixHost(inventory=self.inventory)
# Check host usermacros
if config['usermacro_sync']:
# Make a full copy synce we dont want to lose the original value
# of secret type macros from Netbox
netbox_macros = deepcopy(self.usermacros)
# Set the sync bit
full_sync_bit = bool(str(config['usermacro_sync']).lower() == "full")
for macro in netbox_macros:
# If the Macro is a secret and full sync is NOT activated
if macro["type"] == str(1) and not full_sync_bit:
# Remove the value as the Zabbix api does not return the value key
# This is required when you want to do a diff between both lists
macro.pop("value")
# Sort all lists
def filter_with_macros(macro):
return macro["macro"]
host["macros"].sort(key=filter_with_macros)
netbox_macros.sort(key=filter_with_macros)
# Check if both lists are the same
if host["macros"] == netbox_macros:
self.logger.debug(f"Host {self.name}: usermacros in-sync.")
else:
self.logger.warning(f"Host {self.name}: usermacros OUT of sync.")
# Update Zabbix with NetBox usermacros
self.updateZabbixHost(macros=self.usermacros)
# Check host tags
if config['tag_sync']:
if remove_duplicates(host["tags"], sortkey="tag") == self.tags:
self.logger.debug(f"Host {self.name}: tags in-sync.")
else:
self.logger.warning(f"Host {self.name}: tags OUT of sync.")
self.updateZabbixHost(tags=self.tags)
# If only 1 interface has been found
# pylint: disable=too-many-nested-blocks
if len(host['interfaces']) == 1:
if len(host["interfaces"]) == 1:
updates = {}
# Go through each key / item and check if it matches Zabbix
for key, item in self.setInterfaceDetails()[0].items():
@ -674,7 +831,7 @@ class PhysicalDevice():
if key in host["interfaces"][0]:
# If SNMP is used, go through nested dict
# to compare SNMP parameters
if isinstance(item,dict) and key == "details":
if isinstance(item, dict) and key == "details":
for k, i in item.items():
if k in host["interfaces"][0][key]:
# Set update if values don't match
@ -702,16 +859,19 @@ class PhysicalDevice():
self.logger.warning(f"Host {self.name}: Interface OUT of sync.")
if "type" in updates:
# Changing interface type not supported. Raise exception.
e = (f"Host {self.name}: changing interface type to "
f"{str(updates['type'])} is not supported.")
e = (
f"Host {self.name}: changing interface type to "
f"{str(updates['type'])} is not supported."
)
self.logger.error(e)
raise InterfaceConfigError(e)
# Set interfaceID for Zabbix config
updates["interfaceid"] = host["interfaces"][0]['interfaceid']
updates["interfaceid"] = host["interfaces"][0]["interfaceid"]
try:
# API call to Zabbix
self.zabbix.hostinterface.update(updates)
e = f"Host {self.name}: solved interface conflict."
e = (f"Host {self.name}: updated interface "
f"with data {sanatize_log_output(updates)}.")
self.logger.info(e)
self.create_journal_entry("info", e)
except APIRequestError as e:
@ -723,9 +883,11 @@ class PhysicalDevice():
e = f"Host {self.name}: interface in-sync."
self.logger.debug(e)
else:
e = (f"Host {self.name} has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual interfention required.")
e = (
f"Host {self.name} has unsupported interface configuration."
f" Host has total of {len(host['interfaces'])} interfaces. "
"Manual intervention required."
)
self.logger.error(e)
raise SyncInventoryError(e)
@ -737,20 +899,25 @@ class PhysicalDevice():
if self.journal:
# Check if the severity is valid
if severity not in ["info", "success", "warning", "danger"]:
self.logger.warning(f"Value {severity} not valid for NB journal entries.")
self.logger.warning(
f"Value {severity} not valid for NB journal entries."
)
return False
journal = {"assigned_object_type": "dcim.device",
"assigned_object_id": self.id,
"kind": severity,
"comments": message
}
journal = {
"assigned_object_type": "dcim.device",
"assigned_object_id": self.id,
"kind": severity,
"comments": message,
}
try:
self.nb_journals.create(journal)
self.logger.debug(f"Host {self.name}: Created journal entry in NetBox")
return True
except JournalError(e) as e:
self.logger.warning("Unable to create journal entry for "
f"{self.name}: NB returned {e}")
except NetboxRequestError as e:
self.logger.warning(
"Unable to create journal entry for "
f"{self.name}: NB returned {e}"
)
return False
return False
@ -773,10 +940,15 @@ class PhysicalDevice():
# and add this NB template to the list of successfull templates
tmpls_from_zabbix.pop(pos)
succesfull_templates.append(nb_tmpl)
self.logger.debug(f"Host {self.name}: template "
f"{nb_tmpl['name']} is present in Zabbix.")
self.logger.debug(
f"Host {self.name}: template "
f"{nb_tmpl['name']} is present in Zabbix."
)
break
if len(succesfull_templates) == len(self.zbx_templates) and len(tmpls_from_zabbix) == 0:
if (
len(succesfull_templates) == len(self.zbx_templates)
and len(tmpls_from_zabbix) == 0
):
# All of the NetBox templates have been confirmed as successfull
# and the ZBX template list is empty. This means that
# all of the templates match.

View File

@ -2,32 +2,47 @@
"""
All custom exceptions used for Exception generation
"""
class SyncError(Exception):
""" Class SyncError """
"""Class SyncError"""
class JournalError(Exception):
""" Class SyncError """
"""Class SyncError"""
class SyncExternalError(SyncError):
""" Class SyncExternalError """
"""Class SyncExternalError"""
class SyncInventoryError(SyncError):
""" Class SyncInventoryError """
"""Class SyncInventoryError"""
class SyncDuplicateError(SyncError):
""" Class SyncDuplicateError """
"""Class SyncDuplicateError"""
class EnvironmentVarError(SyncError):
""" Class EnvironmentVarError """
"""Class EnvironmentVarError"""
class InterfaceConfigError(SyncError):
""" Class InterfaceConfigError """
"""Class InterfaceConfigError"""
class ProxyConfigError(SyncError):
""" Class ProxyConfigError """
"""Class ProxyConfigError"""
class HostgroupError(SyncError):
""" Class HostgroupError """
"""Class HostgroupError"""
class TemplateError(SyncError):
""" Class TemplateError """
"""Class TemplateError"""
class UsermacroError(SyncError):
"""Class UsermacroError"""

View File

@ -1,14 +1,27 @@
"""Module for all hostgroup related code"""
from logging import getLogger
from modules.exceptions import HostgroupError
from modules.tools import build_path
class Hostgroup():
class Hostgroup:
"""Hostgroup class for devices and VM's
Takes type (vm or dev) and NB object"""
def __init__(self, obj_type, nb_obj, version, logger=None, #pylint: disable=too-many-arguments, too-many-positional-arguments
nested_sitegroup_flag=False, nested_region_flag=False,
nb_regions=None, nb_groups=None):
# pylint: disable=too-many-arguments, disable=too-many-positional-arguments
def __init__(
self,
obj_type,
nb_obj,
version,
logger=None,
nested_sitegroup_flag=False,
nested_region_flag=False,
nb_regions=None,
nb_groups=None,
):
self.logger = logger if logger else getLogger(__name__)
if obj_type not in ("vm", "dev"):
msg = f"Unable to create hostgroup with type {type}"
@ -19,8 +32,9 @@ class Hostgroup():
self.name = self.nb.name
self.nb_version = version
# Used for nested data objects
self.set_nesting(nested_sitegroup_flag, nested_region_flag,
nb_groups, nb_regions)
self.set_nesting(
nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
)
self._set_format_options()
def __str__(self):
@ -49,40 +63,52 @@ class Hostgroup():
format_options["site_group"] = None
if self.nb.site:
if self.nb.site.region:
format_options["region"] = self.generate_parents("region",
str(self.nb.site.region))
format_options["region"] = self.generate_parents(
"region", str(self.nb.site.region)
)
if self.nb.site.group:
format_options["site_group"] = self.generate_parents("site_group",
str(self.nb.site.group))
format_options["site_group"] = self.generate_parents(
"site_group", str(self.nb.site.group)
)
format_options["role"] = role
format_options["site"] = self.nb.site.name if self.nb.site else None
format_options["tenant"] = str(self.nb.tenant) if self.nb.tenant else None
format_options["tenant_group"] = str(self.nb.tenant.group) if self.nb.tenant else None
format_options["platform"] = self.nb.platform.name if self.nb.platform else None
format_options["tenant_group"] = (
str(self.nb.tenant.group) if self.nb.tenant else None
)
format_options["platform"] = (
self.nb.platform.name if self.nb.platform else None
)
# Variables only applicable for devices
if self.type == "dev":
format_options["manufacturer"] = self.nb.device_type.manufacturer.name
format_options["location"] = str(self.nb.location) if self.nb.location else None
format_options["location"] = (
str(self.nb.location) if self.nb.location else None
)
# Variables only applicable for VM's
if self.type == "vm":
# Check if a cluster is configured. Could also be configured in a site.
if self.nb.cluster:
format_options["cluster"] = self.nb.cluster.name
format_options["cluster_type"] = self.nb.cluster.type.name
self.format_options = format_options
def set_nesting(self, nested_sitegroup_flag, nested_region_flag,
nb_groups, nb_regions):
def set_nesting(
self, nested_sitegroup_flag, nested_region_flag, nb_groups, nb_regions
):
"""Set nesting options for this Hostgroup"""
self.nested_objects = {"site_group": {"flag": nested_sitegroup_flag, "data": nb_groups},
"region": {"flag": nested_region_flag, "data": nb_regions}}
self.nested_objects = {
"site_group": {"flag": nested_sitegroup_flag, "data": nb_groups},
"region": {"flag": nested_region_flag, "data": nb_regions},
}
def generate(self, hg_format=None):
"""Generate hostgroup based on a provided format"""
# Set format to default in case its not specified
if not hg_format:
hg_format = "site/manufacturer/role" if self.type == "dev" else "cluster/role"
hg_format = (
"site/manufacturer/role" if self.type == "dev" else "cluster/role"
)
# Split all given names
hg_output = []
hg_items = hg_format.split("/")
@ -93,8 +119,10 @@ class Hostgroup():
cf_data = self.custom_field_lookup(hg_item)
# CF does not exist
if not cf_data["result"]:
msg = (f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported.")
msg = (
f"Unable to generate hostgroup for host {self.name}. "
f"Item type {hg_item} not supported."
)
self.logger.error(msg)
raise HostgroupError(msg)
# CF data is populated
@ -109,10 +137,12 @@ class Hostgroup():
# Check if the hostgroup is populated with at least one item.
if bool(hg_output):
return "/".join(hg_output)
msg = (f"Unable to generate hostgroup for host {self.name}."
" Not enough valid items. This is most likely"
" due to the use of custom fields that are empty"
" or an invalid hostgroup format.")
msg = (
f"Unable to generate hostgroup for host {self.name}."
" Not enough valid items. This is most likely"
" due to the use of custom fields that are empty"
" or an invalid hostgroup format."
)
self.logger.error(msg)
raise HostgroupError(msg)
@ -157,7 +187,9 @@ class Hostgroup():
return child_object
# If the nested flag is True, perform parent calculation
if self.nested_objects[nest_type]["flag"]:
final_nested_object = build_path(child_object, self.nested_objects[nest_type]["data"])
final_nested_object = build_path(
child_object, self.nested_objects[nest_type]["data"]
)
return "/".join(final_nested_object)
# Nesting is not allowed for this object. Return child_object
return child_object

View File

@ -4,7 +4,8 @@ All of the Zabbix interface related configuration
"""
from modules.exceptions import InterfaceConfigError
class ZabbixInterface():
class ZabbixInterface:
"""Class that represents a Zabbix interface."""
def __init__(self, context, ip):
@ -15,21 +16,16 @@ class ZabbixInterface():
def _set_default_port(self):
"""Sets default TCP / UDP port for different interface types"""
interface_mapping = {
1: 10050,
2: 161,
3: 623,
4: 12345
}
interface_mapping = {1: 10050, 2: 161, 3: 623, 4: 12345}
# Check if interface type is listed in mapper.
if self.interface['type'] not in interface_mapping:
if self.interface["type"] not in interface_mapping:
return False
# Set default port to interface
self.interface["port"] = str(interface_mapping[self.interface['type']])
self.interface["port"] = str(interface_mapping[self.interface["type"]])
return True
def get_context(self):
""" check if NetBox custom context has been defined. """
"""check if NetBox custom context has been defined."""
if "zabbix" in self.context:
zabbix = self.context["zabbix"]
if "interface_type" in zabbix:
@ -43,7 +39,7 @@ class ZabbixInterface():
return False
def set_snmp(self):
""" Check if interface is type SNMP """
"""Check if interface is type SNMP"""
# pylint: disable=too-many-branches
if self.interface["type"] == 2:
# Checks if SNMP settings are defined in NetBox
@ -63,7 +59,7 @@ class ZabbixInterface():
e = "SNMP version option is not defined."
raise InterfaceConfigError(e)
# If version 1 or 2 is used, get community string
if self.interface["details"]["version"] in ['1','2']:
if self.interface["details"]["version"] in ["1", "2"]:
if "community" in snmp:
# Set SNMP community to confix context value
community = snmp["community"]
@ -73,10 +69,16 @@ class ZabbixInterface():
self.interface["details"]["community"] = str(community)
# If version 3 has been used, get all
# SNMPv3 NetBox related configs
elif self.interface["details"]["version"] == '3':
items = ["securityname", "securitylevel", "authpassphrase",
"privpassphrase", "authprotocol", "privprotocol",
"contextname"]
elif self.interface["details"]["version"] == "3":
items = [
"securityname",
"securitylevel",
"authpassphrase",
"privpassphrase",
"authprotocol",
"privprotocol",
"contextname",
]
for key, item in snmp.items():
if key in items:
self.interface["details"][key] = str(item)
@ -91,13 +93,15 @@ class ZabbixInterface():
raise InterfaceConfigError(e)
def set_default_snmp(self):
""" Set default config to SNMPv2, port 161 and community macro. """
"""Set default config to SNMPv2, port 161 and community macro."""
self.interface = self.skelet
self.interface["type"] = "2"
self.interface["port"] = "161"
self.interface["details"] = {"version": "2",
"community": "{$SNMP_COMMUNITY}",
"bulk": "1"}
self.interface["details"] = {
"version": "2",
"community": "{$SNMP_COMMUNITY}",
"bulk": "1",
}
def set_default_agent(self):
"""Sets interface to Zabbix agent defaults"""

41
modules/logging.py Normal file
View File

@ -0,0 +1,41 @@
"""
Logging module for Netbox-Zabbix-sync
"""
import logging
from os import path
logger = logging.getLogger("NetBox-Zabbix-sync")
def get_logger():
"""
Return the logger for Netbox Zabbix Sync
"""
return logger
def setup_logger():
"""
Prepare a logger with stream and file handlers
"""
# Set logging
lgout = logging.StreamHandler()
# Logfile in the project root
project_root = path.dirname(path.dirname(path.realpath(__file__)))
logfile_path = path.join(project_root, "sync.log")
lgfile = logging.FileHandler(logfile_path)
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.WARNING,
handlers=[lgout, lgfile],
)
def set_log_levels(root_level, own_level):
"""
Configure log levels for root and Netbox-Zabbix-sync logger
"""
logging.getLogger().setLevel(root_level)
logger.setLevel(own_level)

133
modules/tags.py Normal file
View File

@ -0,0 +1,133 @@
#!/usr/bin/env python3
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments, logging-fstring-interpolation
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from modules.tools import field_mapper, remove_duplicates
class ZabbixTags:
"""Class that represents a Zabbix interface."""
def __init__(
self,
nb,
tag_map,
tag_sync,
tag_lower=True,
tag_name=None,
tag_value=None,
logger=None,
host=None,
):
self.nb = nb
self.name = host if host else nb.name
self.tag_map = tag_map
self.logger = logger if logger else getLogger(__name__)
self.tags = {}
self.lower = tag_lower
self.tag_name = tag_name
self.tag_value = tag_value
self.tag_sync = tag_sync
self.sync = False
self._set_config()
def __repr__(self):
return self.name
def __str__(self):
return self.__repr__()
def _set_config(self):
"""
Setup class
"""
if self.tag_sync:
self.sync = True
return True
def validate_tag(self, tag_name):
"""
Validates tag name
"""
if tag_name and isinstance(tag_name, str) and len(tag_name) <= 256:
return True
return False
def validate_value(self, tag_value):
"""
Validates tag value
"""
if tag_value and isinstance(tag_value, str) and len(tag_value) <= 256:
return True
return False
def render_tag(self, tag_name, tag_value):
"""
Renders a tag
"""
tag = {}
if self.validate_tag(tag_name):
if self.lower:
tag["tag"] = tag_name.lower()
else:
tag["tag"] = tag_name
else:
self.logger.warning(f"Tag {tag_name} is not a valid tag name, skipping.")
return False
if self.validate_value(tag_value):
if self.lower:
tag["value"] = tag_value.lower()
else:
tag["value"] = tag_value
else:
self.logger.warning(
f"Tag {tag_name} has an invalid value: '{tag_value}', skipping."
)
return False
return tag
def generate(self):
"""
Generate full set of Usermacros
"""
# pylint: disable=too-many-branches
tags = []
# Parse the field mapper for tags
if self.tag_map:
self.logger.debug(f"Host {self.nb.name}: Starting tag mapper")
field_tags = field_mapper(self.nb.name, self.tag_map, self.nb, self.logger)
for tag, value in field_tags.items():
t = self.render_tag(tag, value)
if t:
tags.append(t)
# Parse NetBox config context for tags
if (
"zabbix" in self.nb.config_context
and "tags" in self.nb.config_context["zabbix"]
and isinstance(self.nb.config_context["zabbix"]["tags"], list)
):
for tag in self.nb.config_context["zabbix"]["tags"]:
if isinstance(tag, dict):
for tagname, value in tag.items():
t = self.render_tag(tagname, value)
if t:
tags.append(t)
# Pull in NetBox device tags if tag_name is set
if self.tag_name and isinstance(self.tag_name, str):
for tag in self.nb.tags:
if self.tag_value.lower() in ["display", "name", "slug"]:
value = tag[self.tag_value]
else:
value = tag["name"]
t = self.render_tag(self.tag_name, value)
if t:
tags.append(t)
return remove_duplicates(tags, sortkey="tag")

View File

@ -1,11 +1,14 @@
"""A collection of tools used by several classes"""
from modules.exceptions import HostgroupError
def convert_recordset(recordset):
""" Converts netbox RedcordSet to list of dicts. """
"""Converts netbox RedcordSet to list of dicts."""
recordlist = []
for record in recordset:
recordlist.append(record.__dict__)
return recordlist
def build_path(endpoint, list_of_dicts):
"""
Builds a path list of related parent/child items.
@ -13,16 +16,17 @@ def build_path(endpoint, list_of_dicts):
be used in hostgroups.
"""
item_path = []
itemlist = [i for i in list_of_dicts if i['name'] == endpoint]
itemlist = [i for i in list_of_dicts if i["name"] == endpoint]
item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item['name'])
while item['_depth'] > 0:
itemlist = [i for i in list_of_dicts if i['name'] == str(item['parent'])]
item_path.append(item["name"])
while item["_depth"] > 0:
itemlist = [i for i in list_of_dicts if i["name"] == str(item["parent"])]
item = itemlist[0] if len(itemlist) == 1 else None
item_path.append(item['name'])
item_path.append(item["name"])
item_path.reverse()
return item_path
def proxy_prepper(proxy_list, proxy_group_list):
"""
Function that takes 2 lists and converts them using a
@ -42,3 +46,147 @@ def proxy_prepper(proxy_list, proxy_group_list):
group["monitored_by"] = 2
output.append(group)
return output
def field_mapper(host, mapper, nbdevice, logger):
"""
Maps NetBox field data to Zabbix properties.
Used for Inventory, Usermacros and Tag mappings.
"""
data = {}
# Let's build an dict for each property in the map
for nb_field, zbx_field in mapper.items():
field_list = nb_field.split("/") # convert str to list based on delimiter
# start at the base of the dict...
value = nbdevice
# ... and step through the dict till we find the needed value
for item in field_list:
value = value[item] if value else None
# Check if the result is usable and expected
# We want to apply any int or float 0 values,
# even if python thinks those are empty.
if (value and isinstance(value, int | float | str)) or (
isinstance(value, int | float) and int(value) == 0
):
data[zbx_field] = str(value)
elif not value:
# empty value should just be an empty string for API compatibility
logger.debug(
f"Host {host}: NetBox lookup for "
f"'{nb_field}' returned an empty value"
)
data[zbx_field] = ""
else:
# Value is not a string or numeral, probably not what the user expected.
logger.error(
f"Host {host}: Lookup for '{nb_field}'"
" returned an unexpected type: it will be skipped."
)
logger.debug(
f"Host {host}: Field mapping complete. "
f"Mapped {len(list(filter(None, data.values())))} field(s)"
)
return data
def remove_duplicates(input_list, sortkey=None):
"""
Removes duplicate entries from a list and sorts the list
"""
output_list = []
if isinstance(input_list, list):
output_list = [dict(t) for t in {tuple(d.items()) for d in input_list}]
if sortkey and isinstance(sortkey, str):
output_list.sort(key=lambda x: x[sortkey])
return output_list
def verify_hg_format(hg_format, device_cfs=None, vm_cfs=None, hg_type="dev", logger=None):
"""
Verifies hostgroup field format
"""
if not device_cfs:
device_cfs = []
if not vm_cfs:
vm_cfs = []
allowed_objects = {"dev": ["location",
"rack",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"platform",
"cluster"]
,"vm": ["location",
"role",
"manufacturer",
"region",
"site",
"site_group",
"tenant",
"tenant_group",
"cluster",
"device",
"platform"]
,"cfs": {"dev": [], "vm": []}
}
for cf in device_cfs:
allowed_objects['cfs']['dev'].append(cf.name)
for cf in vm_cfs:
allowed_objects['cfs']['vm'].append(cf.name)
hg_objects = []
if isinstance(hg_format,list):
for f in hg_format:
hg_objects = hg_objects + f.split("/")
else:
hg_objects = hg_format.split("/")
hg_objects = sorted(set(hg_objects))
for hg_object in hg_objects:
if (hg_object not in allowed_objects[hg_type] and
hg_object not in allowed_objects['cfs'][hg_type]):
e = (
f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and separate them with '/'."
)
logger.error(e)
raise HostgroupError(e)
def sanatize_log_output(data):
"""
Used for the update function to Zabbix which
shows the data that its using to update the host.
Removes and sensitive data from the input.
"""
if not isinstance(data, dict):
return data
sanitized_data = data.copy()
# Check if there are any sensitive macros defined in the data
if "macros" in data:
for macro in sanitized_data["macros"]:
# Check if macro is secret type
if not macro["type"] == str(1):
continue
macro["value"] = "********"
# Check for interface data
if "interfaceid" in data:
# Interface ID is a value which is most likely not helpful
# in logging output or for troubleshooting.
del sanitized_data["interfaceid"]
# InterfaceID also hints that this is a interface update.
# A check is required if there are no macro's used for SNMP security parameters.
if not "details" in data:
return sanitized_data
for key, detail in sanitized_data["details"].items():
# If the detail is a secret, we don't want to log it.
if key in ("authpassphrase", "privpassphrase", "securityname", "community"):
# Check if a macro is used.
# If so then logging the output is not a security issue.
if detail.startswith("{$") and detail.endswith("}"):
continue
# A macro is not used, so we sanitize the value.
sanitized_data["details"][key] = "********"
return sanitized_data

122
modules/usermacros.py Normal file
View File

@ -0,0 +1,122 @@
#!/usr/bin/env python3
# pylint: disable=too-many-instance-attributes, too-many-arguments, too-many-positional-arguments, logging-fstring-interpolation
"""
All of the Zabbix Usermacro related configuration
"""
from logging import getLogger
from re import match
from modules.tools import field_mapper
class ZabbixUsermacros:
"""Class that represents Zabbix usermacros."""
def __init__(self, nb, usermacro_map, usermacro_sync, logger=None, host=None):
self.nb = nb
self.name = host if host else nb.name
self.usermacro_map = usermacro_map
self.logger = logger if logger else getLogger(__name__)
self.usermacros = {}
self.usermacro_sync = usermacro_sync
self.sync = False
self.force_sync = False
self._set_config()
def __repr__(self):
return self.name
def __str__(self):
return self.__repr__()
def _set_config(self):
"""
Setup class
"""
if str(self.usermacro_sync).lower() == "full":
self.sync = True
self.force_sync = True
elif self.usermacro_sync:
self.sync = True
return True
def validate_macro(self, macro_name):
"""
Validates usermacro name
"""
pattern = r"\{\$[A-Z0-9\._]*(\:.*)?\}"
return match(pattern, macro_name)
def render_macro(self, macro_name, macro_properties):
"""
Renders a full usermacro from partial input
"""
macro = {}
macrotypes = {"text": 0, "secret": 1, "vault": 2}
if self.validate_macro(macro_name):
macro["macro"] = str(macro_name)
if isinstance(macro_properties, dict):
if not "value" in macro_properties:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} has "
"no value in Netbox, skipping.")
return False
macro["value"] = macro_properties["value"]
if (
"type" in macro_properties
and macro_properties["type"].lower() in macrotypes
):
macro["type"] = str(macrotypes[macro_properties["type"]])
else:
macro["type"] = str(0)
if "description" in macro_properties and isinstance(
macro_properties["description"], str
):
macro["description"] = macro_properties["description"]
else:
macro["description"] = ""
elif isinstance(macro_properties, str) and macro_properties:
macro["value"] = macro_properties
macro["type"] = str(0)
macro["description"] = ""
else:
self.logger.warning(f"Host {self.name}: Usermacro {macro_name} "
"has no value, skipping.")
return False
else:
self.logger.error(
f"Host {self.name}: Usermacro {macro_name} is not a valid usermacro name, skipping."
)
return False
return macro
def generate(self):
"""
Generate full set of Usermacros
"""
macros = []
# Parse the field mapper for usermacros
if self.usermacro_map:
self.logger.debug(f"Host {self.nb.name}: Starting usermacro mapper")
field_macros = field_mapper(
self.nb.name, self.usermacro_map, self.nb, self.logger
)
for macro, value in field_macros.items():
m = self.render_macro(macro, value)
if m:
macros.append(m)
# Parse NetBox config context for usermacros
if (
"zabbix" in self.nb.config_context
and "usermacros" in self.nb.config_context["zabbix"]
):
for macro, properties in self.nb.config_context["zabbix"][
"usermacros"
].items():
m = self.render_macro(macro, properties)
if m:
macros.append(m)
return macros

View File

@ -1,42 +1,55 @@
#!/usr/bin/env python3
# pylint: disable=duplicate-code
"""Module that hosts all functions for virtual machine processing"""
from os import sys
from modules.device import PhysicalDevice
from modules.exceptions import InterfaceConfigError, SyncInventoryError, TemplateError
from modules.hostgroups import Hostgroup
from modules.interface import ZabbixInterface
from modules.exceptions import TemplateError, InterfaceConfigError, SyncInventoryError
try:
from config import (
traverse_site_groups,
traverse_regions
)
except ModuleNotFoundError:
print("Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py.")
sys.exit(0)
from modules.config import load_config
# Load config
config = load_config()
class VirtualMachine(PhysicalDevice):
"""Model for virtual machines"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.hostgroup = None
self.zbx_template_names = None
def _inventory_map(self):
"""use VM inventory maps"""
return config["vm_inventory_map"]
def _usermacro_map(self):
"""use VM usermacro maps"""
return config["vm_usermacro_map"]
def _tag_map(self):
"""use VM tag maps"""
return config["vm_tag_map"]
def set_hostgroup(self, hg_format, nb_site_groups, nb_regions):
"""Set the hostgroup for this device"""
# Create new Hostgroup instance
hg = Hostgroup("vm", self.nb, self.nb_api_version, logger=self.logger,
nested_sitegroup_flag=traverse_site_groups,
nested_region_flag=traverse_regions,
nb_groups=nb_site_groups,
nb_regions=nb_regions)
hg = Hostgroup(
"vm",
self.nb,
self.nb_api_version,
logger=self.logger,
nested_sitegroup_flag=config["traverse_site_groups"],
nested_region_flag=config["traverse_regions"],
nb_groups=nb_site_groups,
nb_regions=nb_regions,
)
# Generate hostgroup based on hostgroup format
self.hostgroup = hg.generate(hg_format)
if isinstance(hg_format, list):
self.hostgroups = [hg.generate(f) for f in hg_format]
else:
self.hostgroups.append(hg.generate(hg_format))
def set_vm_template(self):
""" Set Template for VMs. Overwrites default class
"""Set Template for VMs. Overwrites default class
to skip a lookup of custom fields."""
# Gather templates ONLY from the device specific context
try:
@ -45,7 +58,7 @@ class VirtualMachine(PhysicalDevice):
self.logger.warning(e)
return True
def setInterfaceDetails(self): # pylint: disable=invalid-name
def setInterfaceDetails(self): # pylint: disable=invalid-name
"""
Overwrites device function to select an agent interface type by default
Agent type interfaces are more likely to be used with VMs then SNMP

View File

@ -2,53 +2,27 @@
# pylint: disable=invalid-name, logging-not-lazy, too-many-locals, logging-fstring-interpolation
"""NetBox to Zabbix sync script."""
import logging
import argparse
import logging
import ssl
from os import environ, path, sys
from os import environ, sys
from pynetbox import api
from pynetbox.core.query import RequestError as NBRequestError
from requests.exceptions import ConnectionError as RequestsConnectionError
from zabbix_utils import ZabbixAPI, APIRequestError, ProcessingError
from zabbix_utils import APIRequestError, ProcessingError, ZabbixAPI
from modules.config import load_config
from modules.device import PhysicalDevice
from modules.exceptions import EnvironmentVarError, SyncError
from modules.logging import get_logger, set_log_levels, setup_logger
from modules.tools import convert_recordset, proxy_prepper, verify_hg_format
from modules.virtual_machine import VirtualMachine
from modules.tools import convert_recordset, proxy_prepper
from modules.exceptions import EnvironmentVarError, HostgroupError, SyncError
try:
from config import (
templates_config_context,
templates_config_context_overrule,
clustering, create_hostgroups,
create_journal, full_proxy_sync,
zabbix_device_removal,
zabbix_device_disable,
hostgroup_format,
vm_hostgroup_format,
nb_device_filter,
sync_vms,
nb_vm_filter
)
except ModuleNotFoundError:
print("Configuration file config.py not found in main directory."
"Please create the file or rename the config.py.example file to config.py.")
sys.exit(1)
# Set logging
log_format = logging.Formatter('%(asctime)s - %(name)s - '
'%(levelname)s - %(message)s')
lgout = logging.StreamHandler()
lgout.setFormatter(log_format)
lgout.setLevel(logging.DEBUG)
config = load_config()
lgfile = logging.FileHandler(path.join(path.dirname(
path.realpath(__file__)), "sync.log"))
lgfile.setFormatter(log_format)
lgfile.setLevel(logging.DEBUG)
logger = logging.getLogger("NetBox-Zabbix-sync")
logger.addHandler(lgout)
logger.addHandler(lgfile)
logger.setLevel(logging.WARNING)
setup_logger()
logger = get_logger()
def main(arguments):
@ -56,7 +30,14 @@ def main(arguments):
# pylint: disable=too-many-branches, too-many-statements
# set environment variables
if arguments.verbose:
logger.setLevel(logging.DEBUG)
set_log_levels(logging.WARNING, logging.INFO)
if arguments.debug:
set_log_levels(logging.WARNING, logging.DEBUG)
if arguments.debug_all:
set_log_levels(logging.DEBUG, logging.DEBUG)
if arguments.quiet:
set_log_levels(logging.ERROR, logging.ERROR)
env_vars = ["ZABBIX_HOST", "NETBOX_HOST", "NETBOX_TOKEN"]
if "ZABBIX_TOKEN" in environ:
env_vars.append("ZABBIX_TOKEN")
@ -82,29 +63,34 @@ def main(arguments):
netbox_token = environ.get("NETBOX_TOKEN")
# Set NetBox API
netbox = api(netbox_host, token=netbox_token, threading=True)
# Check if the provided Hostgroup layout is valid
hg_objects = hostgroup_format.split("/")
allowed_objects = ["location", "role", "manufacturer", "region",
"site", "site_group", "tenant", "tenant_group"]
# Create API call to get all custom fields which are on the device objects
try:
device_cfs = list(netbox.extras.custom_fields.filter(
type="text", content_type_id=23))
# Get NetBox version
nb_version = netbox.version
logger.debug(f"NetBox version is {nb_version}.")
except RequestsConnectionError:
logger.error(f"Unable to connect to NetBox with URL {netbox_host}."
" Please check the URL and status of NetBox.")
logger.error(
f"Unable to connect to NetBox with URL {netbox_host}."
" Please check the URL and status of NetBox."
)
sys.exit(1)
except NBRequestError as e:
logger.error(f"NetBox error: {e}")
sys.exit(1)
for cf in device_cfs:
allowed_objects.append(cf.name)
for hg_object in hg_objects:
if hg_object not in allowed_objects:
e = (f"Hostgroup item {hg_object} is not valid. Make sure you"
" use valid items and seperate them with '/'.")
logger.error(e)
raise HostgroupError(e)
# Check if the provided Hostgroup layout is valid
device_cfs = []
vm_cfs = []
device_cfs = list(
netbox.extras.custom_fields.filter(type="text", content_types="dcim.device")
)
verify_hg_format(config["hostgroup_format"],
device_cfs=device_cfs, hg_type="dev", logger=logger)
if config["sync_vms"]:
vm_cfs = list(
netbox.extras.custom_fields.filter(type="text",
content_types="virtualization.virtualmachine")
)
verify_hg_format(config["vm_hostgroup_format"], vm_cfs=vm_cfs, hg_type="vm", logger=logger)
# Set Zabbix API
try:
ssl_ctx = ssl.create_default_context()
@ -114,96 +100,100 @@ def main(arguments):
ssl_ctx.load_verify_locations(environ["REQUESTS_CA_BUNDLE"])
if not zabbix_token:
zabbix = ZabbixAPI(zabbix_host, user=zabbix_user,
password=zabbix_pass, ssl_context=ssl_ctx)
else:
zabbix = ZabbixAPI(
zabbix_host, token=zabbix_token, ssl_context=ssl_ctx)
zabbix_host, user=zabbix_user, password=zabbix_pass, ssl_context=ssl_ctx
)
else:
zabbix = ZabbixAPI(zabbix_host, token=zabbix_token, ssl_context=ssl_ctx)
zabbix.check_auth()
except (APIRequestError, ProcessingError) as e:
e = f"Zabbix returned the following error: {str(e)}"
logger.error(e)
sys.exit(1)
# Set API parameter mapping based on API version
if not str(zabbix.version).startswith('7'):
if not str(zabbix.version).startswith("7"):
proxy_name = "host"
else:
proxy_name = "name"
# Get all Zabbix and NetBox data
netbox_devices = list(netbox.dcim.devices.filter(**nb_device_filter))
netbox_devices = list(netbox.dcim.devices.filter(**config["nb_device_filter"]))
netbox_vms = []
if sync_vms:
if config["sync_vms"]:
netbox_vms = list(
netbox.virtualization.virtual_machines.filter(**nb_vm_filter))
netbox.virtualization.virtual_machines.filter(**config["nb_vm_filter"]))
netbox_site_groups = convert_recordset((netbox.dcim.site_groups.all()))
netbox_regions = convert_recordset(netbox.dcim.regions.all())
netbox_journals = netbox.extras.journal_entries
zabbix_groups = zabbix.hostgroup.get(output=['groupid', 'name'])
zabbix_templates = zabbix.template.get(output=['templateid', 'name'])
zabbix_proxies = zabbix.proxy.get(output=['proxyid', proxy_name])
zabbix_groups = zabbix.hostgroup.get(output=["groupid", "name"])
zabbix_templates = zabbix.template.get(output=["templateid", "name"])
zabbix_proxies = zabbix.proxy.get(output=["proxyid", proxy_name])
# Set empty list for proxy processing Zabbix <= 6
zabbix_proxygroups = []
if str(zabbix.version).startswith('7'):
zabbix_proxygroups = zabbix.proxygroup.get(
output=["proxy_groupid", "name"])
if str(zabbix.version).startswith("7"):
zabbix_proxygroups = zabbix.proxygroup.get(output=["proxy_groupid", "name"])
# Sanitize proxy data
if proxy_name == "host":
for proxy in zabbix_proxies:
proxy['name'] = proxy.pop('host')
proxy["name"] = proxy.pop("host")
# Prepare list of all proxy and proxy_groups
zabbix_proxy_list = proxy_prepper(zabbix_proxies, zabbix_proxygroups)
# Get NetBox API version
nb_version = netbox.version
# Go through all NetBox devices
for nb_vm in netbox_vms:
try:
vm = VirtualMachine(nb_vm, zabbix, netbox_journals, nb_version,
create_journal, logger)
config["create_journal"], logger)
logger.debug(f"Host {vm.name}: started operations on VM.")
vm.set_vm_template()
# Check if a valid template has been found for this VM.
if not vm.zbx_template_names:
continue
vm.set_hostgroup(vm_hostgroup_format,
vm.set_hostgroup(config["vm_hostgroup_format"],
netbox_site_groups, netbox_regions)
# Check if a valid hostgroup has been found for this VM.
if not vm.hostgroup:
if not vm.hostgroups:
continue
vm.set_inventory(nb_vm)
vm.set_usermacros()
vm.set_tags()
# Checks if device is in cleanup state
if vm.status in zabbix_device_removal:
if vm.status in config["zabbix_device_removal"]:
if vm.zabbix_id:
# Delete device from Zabbix
# and remove hostID from NetBox.
vm.cleanup()
logger.info(f"VM {vm.name}: cleanup complete")
logger.debug(f"VM {vm.name}: cleanup complete")
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(f"VM {vm.name}: skipping since this VM is "
f"not in the active state.")
logger.info(
f"VM {vm.name}: skipping since this VM is "
f"not in the active state."
)
continue
# Check if the VM is in the disabled state
if vm.status in zabbix_device_disable:
if vm.status in config["zabbix_device_disable"]:
vm.zabbix_state = 1
# Check if VM is already in Zabbix
if vm.zabbix_id:
vm.ConsistencyCheck(zabbix_groups, zabbix_templates,
zabbix_proxy_list, full_proxy_sync,
create_hostgroups)
continue
# Add hostgroup is config is set
if create_hostgroups:
# Add hostgroup if config is set
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = vm.createZabbixHostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
zabbix_groups.append(group)
# Check if VM is already in Zabbix
if vm.zabbix_id:
vm.ConsistencyCheck(
zabbix_groups,
zabbix_templates,
zabbix_proxy_list,
config["full_proxy_sync"],
config["create_hostgroups"],
)
continue
# Add VM to Zabbix
vm.createInZabbix(zabbix_groups, zabbix_templates,
zabbix_proxy_list)
vm.createInZabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
except SyncError:
pass
@ -211,36 +201,39 @@ def main(arguments):
try:
# Set device instance set data such as hostgroup and template information.
device = PhysicalDevice(nb_device, zabbix, netbox_journals, nb_version,
create_journal, logger)
config["create_journal"], logger)
logger.debug(f"Host {device.name}: started operations on device.")
device.set_template(templates_config_context,
templates_config_context_overrule)
device.set_template(config["templates_config_context"],
config["templates_config_context_overrule"])
# Check if a valid template has been found for this VM.
if not device.zbx_template_names:
continue
device.set_hostgroup(
hostgroup_format, netbox_site_groups, netbox_regions)
config["hostgroup_format"], netbox_site_groups, netbox_regions)
# Check if a valid hostgroup has been found for this VM.
if not device.hostgroup:
if not device.hostgroups:
continue
device.set_inventory(nb_device)
device.set_usermacros()
device.set_tags()
# Checks if device is part of cluster.
# Requires clustering variable
if device.isCluster() and clustering:
if device.isCluster() and config["clustering"]:
# Check if device is primary or secondary
if device.promoteMasterDevice():
e = (f"Device {device.name}: is "
f"part of cluster and primary.")
e = f"Device {device.name}: is " f"part of cluster and primary."
logger.info(e)
else:
# Device is secondary in cluster.
# Don't continue with this device.
e = (f"Device {device.name}: is part of cluster "
f"but not primary. Skipping this host...")
e = (
f"Device {device.name}: is part of cluster "
f"but not primary. Skipping this host..."
)
logger.info(e)
continue
# Checks if device is in cleanup state
if device.status in zabbix_device_removal:
if device.status in config["zabbix_device_removal"]:
if device.zabbix_id:
# Delete device from Zabbix
# and remove hostID from NetBox.
@ -249,38 +242,55 @@ def main(arguments):
continue
# Device has been added to NetBox
# but is not in Activate state
logger.info(f"Device {device.name}: skipping since this device is "
f"not in the active state.")
logger.info(
f"Device {device.name}: skipping since this device is "
f"not in the active state."
)
continue
# Check if the device is in the disabled state
if device.status in zabbix_device_disable:
if device.status in config["zabbix_device_disable"]:
device.zabbix_state = 1
# Check if device is already in Zabbix
if device.zabbix_id:
device.ConsistencyCheck(zabbix_groups, zabbix_templates,
zabbix_proxy_list, full_proxy_sync,
create_hostgroups)
continue
# Add hostgroup is config is set
if create_hostgroups:
if config["create_hostgroups"]:
# Create new hostgroup. Potentially multiple groups if nested
hostgroups = device.createZabbixHostgroup(zabbix_groups)
# go through all newly created hostgroups
for group in hostgroups:
# Add new hostgroups to zabbix group list
zabbix_groups.append(group)
# Check if device is already in Zabbix
if device.zabbix_id:
device.ConsistencyCheck(
zabbix_groups,
zabbix_templates,
zabbix_proxy_list,
config["full_proxy_sync"],
config["create_hostgroups"],
)
continue
# Add device to Zabbix
device.createInZabbix(zabbix_groups, zabbix_templates,
zabbix_proxy_list)
device.createInZabbix(zabbix_groups, zabbix_templates, zabbix_proxy_list)
except SyncError:
pass
zabbix.logout()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='A script to sync Zabbix with NetBox device data.'
description="A script to sync Zabbix with NetBox device data."
)
parser.add_argument("-v", "--verbose", help="Turn on debugging.",
action="store_true")
parser.add_argument(
"-v", "--verbose", help="Turn on debugging.", action="store_true"
)
parser.add_argument(
"-vv", "--debug", help="Turn on debugging.", action="store_true"
)
parser.add_argument(
"-vvv",
"--debug-all",
help="Turn on debugging for all modules.",
action="store_true",
)
parser.add_argument("-q", "--quiet", help="Turn off warnings.", action="store_true")
args = parser.parse_args()
main(args)

View File

@ -1,2 +1,2 @@
pynetbox
zabbix-utils==2.0.1
pynetbox==7.4.1
zabbix-utils==2.0.2

0
tests/__init__.py Normal file
View File

View File

@ -0,0 +1,139 @@
"""Tests for configuration parsing in the modules.config module."""
from unittest.mock import patch, MagicMock
import os
from modules.config import load_config, DEFAULT_CONFIG, load_config_file, load_env_variable
def test_load_config_defaults():
"""Test that load_config returns default values when no config file or env vars are present"""
with patch('modules.config.load_config_file', return_value=DEFAULT_CONFIG.copy()), \
patch('modules.config.load_env_variable', return_value=None):
config = load_config()
assert config == DEFAULT_CONFIG
assert config["templates_config_context"] is False
assert config["create_hostgroups"] is True
def test_load_config_file():
"""Test that load_config properly loads values from config file"""
mock_config = DEFAULT_CONFIG.copy()
mock_config["templates_config_context"] = True
mock_config["sync_vms"] = True
with patch('modules.config.load_config_file', return_value=mock_config), \
patch('modules.config.load_env_variable', return_value=None):
config = load_config()
assert config["templates_config_context"] is True
assert config["sync_vms"] is True
# Unchanged values should remain as defaults
assert config["create_journal"] is False
def test_load_env_variables():
"""Test that load_config properly loads values from environment variables"""
# Mock env variable loading to return values for specific keys
def mock_load_env(key):
if key == "sync_vms":
return True
if key == "create_journal":
return True
return None
with patch('modules.config.load_config_file', return_value=DEFAULT_CONFIG.copy()), \
patch('modules.config.load_env_variable', side_effect=mock_load_env):
config = load_config()
assert config["sync_vms"] is True
assert config["create_journal"] is True
# Unchanged values should remain as defaults
assert config["templates_config_context"] is False
def test_env_vars_override_config_file():
"""Test that environment variables override values from config file"""
mock_config = DEFAULT_CONFIG.copy()
mock_config["templates_config_context"] = True
mock_config["sync_vms"] = False
# Mock env variable that will override the config file value
def mock_load_env(key):
if key == "sync_vms":
return True
return None
with patch('modules.config.load_config_file', return_value=mock_config), \
patch('modules.config.load_env_variable', side_effect=mock_load_env):
config = load_config()
# This should be overridden by the env var
assert config["sync_vms"] is True
# This should remain from the config file
assert config["templates_config_context"] is True
def test_load_config_file_function():
"""Test the load_config_file function directly"""
# Test when the file exists
with patch('pathlib.Path.exists', return_value=True), \
patch('importlib.util.spec_from_file_location') as mock_spec:
# Setup the mock module with attributes
mock_module = MagicMock()
mock_module.templates_config_context = True
mock_module.sync_vms = True
# Setup the mock spec
mock_spec_instance = MagicMock()
mock_spec.return_value = mock_spec_instance
mock_spec_instance.loader.exec_module = lambda x: None
# Patch module_from_spec to return our mock module
with patch('importlib.util.module_from_spec', return_value=mock_module):
config = load_config_file(DEFAULT_CONFIG.copy())
assert config["templates_config_context"] is True
assert config["sync_vms"] is True
def test_load_config_file_not_found():
"""Test load_config_file when the config file doesn't exist"""
with patch('pathlib.Path.exists', return_value=False):
result = load_config_file(DEFAULT_CONFIG.copy())
# Should return a dict equal to DEFAULT_CONFIG, not a new object
assert result == DEFAULT_CONFIG
def test_load_env_variable_function():
"""Test the load_env_variable function directly"""
# Create a real environment variable for testing with correct prefix and uppercase
test_var = "NBZX_TEMPLATES_CONFIG_CONTEXT"
original_env = os.environ.get(test_var, None)
try:
# Set the environment variable with the proper prefix and case
os.environ[test_var] = "True"
# Test that it's properly read (using lowercase in the function call)
value = load_env_variable("templates_config_context")
assert value == "True"
# Test when the environment variable doesn't exist
value = load_env_variable("nonexistent_variable")
assert value is None
finally:
# Clean up - restore original environment
if original_env is not None:
os.environ[test_var] = original_env
else:
os.environ.pop(test_var, None)
def test_load_config_file_exception_handling():
"""Test that load_config_file handles exceptions gracefully"""
# This test requires modifying the load_config_file function to handle exceptions
# For now, we're just checking that an exception is raised
with patch('pathlib.Path.exists', return_value=True), \
patch('importlib.util.spec_from_file_location', side_effect=Exception("Import error")):
# Since the current implementation doesn't handle exceptions, we should
# expect an exception to be raised
try:
load_config_file(DEFAULT_CONFIG.copy())
assert False, "An exception should have been raised"
except Exception: # pylint: disable=broad-except
# This is expected
pass

View File

@ -0,0 +1,166 @@
"""Tests for device deletion functionality in the PhysicalDevice class."""
import unittest
from unittest.mock import MagicMock, patch
from zabbix_utils import APIRequestError
from modules.device import PhysicalDevice
from modules.exceptions import SyncExternalError
class TestDeviceDeletion(unittest.TestCase):
"""Test class for device deletion functionality."""
def setUp(self):
"""Set up test fixtures."""
# Create mock NetBox device
self.mock_nb_device = MagicMock()
self.mock_nb_device.id = 123
self.mock_nb_device.name = "test-device"
self.mock_nb_device.status.label = "Decommissioning"
self.mock_nb_device.custom_fields = {"zabbix_hostid": "456"}
self.mock_nb_device.config_context = {}
# Set up a primary IP
primary_ip = MagicMock()
primary_ip.address = "192.168.1.1/24"
self.mock_nb_device.primary_ip = primary_ip
# Create mock Zabbix API
self.mock_zabbix = MagicMock()
self.mock_zabbix.version = "6.0"
# Set up mock host.get response
self.mock_zabbix.host.get.return_value = [{"hostid": "456"}]
# Mock NetBox journal class
self.mock_nb_journal = MagicMock()
# Create logger mock
self.mock_logger = MagicMock()
# Create PhysicalDevice instance with mocks
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
self.device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
journal=True,
logger=self.mock_logger
)
def test_cleanup_successful_deletion(self):
"""Test successful device deletion from Zabbix."""
# Setup
self.mock_zabbix.host.get.return_value = [{"hostid": "456"}]
self.mock_zabbix.host.delete.return_value = {"hostids": ["456"]}
# Execute
self.device.cleanup()
# Verify
self.mock_zabbix.host.get.assert_called_once_with(filter={'hostid': '456'}, output=[])
self.mock_zabbix.host.delete.assert_called_once_with('456')
self.mock_nb_device.save.assert_called_once()
self.assertIsNone(self.mock_nb_device.custom_fields["zabbix_hostid"])
self.mock_logger.info.assert_called_with(f"Host {self.device.name}: "
"Deleted host from Zabbix.")
def test_cleanup_device_already_deleted(self):
"""Test cleanup when device is already deleted from Zabbix."""
# Setup
self.mock_zabbix.host.get.return_value = [] # Empty list means host not found
# Execute
self.device.cleanup()
# Verify
self.mock_zabbix.host.get.assert_called_once_with(filter={'hostid': '456'}, output=[])
self.mock_zabbix.host.delete.assert_not_called()
self.mock_nb_device.save.assert_called_once()
self.assertIsNone(self.mock_nb_device.custom_fields["zabbix_hostid"])
self.mock_logger.info.assert_called_with(
f"Host {self.device.name}: was already deleted from Zabbix. Removed link in NetBox.")
def test_cleanup_api_error(self):
"""Test cleanup when Zabbix API returns an error."""
# Setup
self.mock_zabbix.host.get.return_value = [{"hostid": "456"}]
self.mock_zabbix.host.delete.side_effect = APIRequestError("API Error")
# Execute and verify
with self.assertRaises(SyncExternalError):
self.device.cleanup()
# Verify correct calls were made
self.mock_zabbix.host.get.assert_called_once_with(filter={'hostid': '456'}, output=[])
self.mock_zabbix.host.delete.assert_called_once_with('456')
self.mock_nb_device.save.assert_not_called()
self.mock_logger.error.assert_called()
def test_zeroize_cf(self):
"""Test _zeroize_cf method that clears the custom field."""
# Execute
self.device._zeroize_cf() # pylint: disable=protected-access
# Verify
self.assertIsNone(self.mock_nb_device.custom_fields["zabbix_hostid"])
self.mock_nb_device.save.assert_called_once()
def test_create_journal_entry(self):
"""Test create_journal_entry method."""
# Setup
test_message = "Test journal entry"
# Execute
result = self.device.create_journal_entry("info", test_message)
# Verify
self.assertTrue(result)
self.mock_nb_journal.create.assert_called_once()
journal_entry = self.mock_nb_journal.create.call_args[0][0]
self.assertEqual(journal_entry["assigned_object_type"], "dcim.device")
self.assertEqual(journal_entry["assigned_object_id"], 123)
self.assertEqual(journal_entry["kind"], "info")
self.assertEqual(journal_entry["comments"], test_message)
def test_create_journal_entry_invalid_severity(self):
"""Test create_journal_entry with invalid severity."""
# Execute
result = self.device.create_journal_entry("invalid", "Test message")
# Verify
self.assertFalse(result)
self.mock_nb_journal.create.assert_not_called()
self.mock_logger.warning.assert_called()
def test_create_journal_entry_when_disabled(self):
"""Test create_journal_entry when journaling is disabled."""
# Setup - create device with journal=False
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
journal=False, # Disable journaling
logger=self.mock_logger
)
# Execute
result = device.create_journal_entry("info", "Test message")
# Verify
self.assertFalse(result)
self.mock_nb_journal.create.assert_not_called()
def test_cleanup_updates_journal(self):
"""Test that cleanup method creates a journal entry."""
# Setup
self.mock_zabbix.host.get.return_value = [{"hostid": "456"}]
# Execute
with patch.object(self.device, 'create_journal_entry') as mock_journal_entry:
self.device.cleanup()
# Verify
mock_journal_entry.assert_called_once_with("warning", "Deleted host from Zabbix")

247
tests/test_interface.py Normal file
View File

@ -0,0 +1,247 @@
"""Tests for the ZabbixInterface class in the interface module."""
import unittest
from modules.interface import ZabbixInterface
from modules.exceptions import InterfaceConfigError
class TestZabbixInterface(unittest.TestCase):
"""Test class for ZabbixInterface functionality."""
def setUp(self):
"""Set up test fixtures."""
self.test_ip = "192.168.1.1"
self.empty_context = {}
self.default_interface = ZabbixInterface(self.empty_context, self.test_ip)
# Create some test contexts for different scenarios
self.snmpv2_context = {
"zabbix": {
"interface_type": 2,
"interface_port": "161",
"snmp": {
"version": 2,
"community": "public",
"bulk": 1
}
}
}
self.snmpv3_context = {
"zabbix": {
"interface_type": 2,
"snmp": {
"version": 3,
"securityname": "snmpuser",
"securitylevel": "authPriv",
"authprotocol": "SHA",
"authpassphrase": "authpass123",
"privprotocol": "AES",
"privpassphrase": "privpass123",
"contextname": "context1"
}
}
}
self.agent_context = {
"zabbix": {
"interface_type": 1,
"interface_port": "10050"
}
}
def test_init(self):
"""Test initialization of ZabbixInterface."""
interface = ZabbixInterface(self.empty_context, self.test_ip)
# Check basic properties
self.assertEqual(interface.ip, self.test_ip)
self.assertEqual(interface.context, self.empty_context)
self.assertEqual(interface.interface["ip"], self.test_ip)
self.assertEqual(interface.interface["main"], "1")
self.assertEqual(interface.interface["useip"], "1")
self.assertEqual(interface.interface["dns"], "")
def test_get_context_empty(self):
"""Test get_context with empty context."""
interface = ZabbixInterface(self.empty_context, self.test_ip)
result = interface.get_context()
self.assertFalse(result)
def test_get_context_with_interface_type(self):
"""Test get_context with interface_type but no port."""
context = {"zabbix": {"interface_type": 2}}
interface = ZabbixInterface(context, self.test_ip)
# Should set type and default port
result = interface.get_context()
self.assertTrue(result)
self.assertEqual(interface.interface["type"], 2)
self.assertEqual(interface.interface["port"], "161") # Default port for SNMP
def test_get_context_with_interface_type_and_port(self):
"""Test get_context with both interface_type and port."""
context = {"zabbix": {"interface_type": 1, "interface_port": "12345"}}
interface = ZabbixInterface(context, self.test_ip)
# Should set type and specified port
result = interface.get_context()
self.assertTrue(result)
self.assertEqual(interface.interface["type"], 1)
self.assertEqual(interface.interface["port"], "12345")
def test_set_default_port(self):
"""Test _set_default_port for different interface types."""
interface = ZabbixInterface(self.empty_context, self.test_ip)
# Test for agent type (1)
interface.interface["type"] = 1
interface._set_default_port() # pylint: disable=protected-access
self.assertEqual(interface.interface["port"], "10050")
# Test for SNMP type (2)
interface.interface["type"] = 2
interface._set_default_port() # pylint: disable=protected-access
self.assertEqual(interface.interface["port"], "161")
# Test for IPMI type (3)
interface.interface["type"] = 3
interface._set_default_port() # pylint: disable=protected-access
self.assertEqual(interface.interface["port"], "623")
# Test for JMX type (4)
interface.interface["type"] = 4
interface._set_default_port() # pylint: disable=protected-access
self.assertEqual(interface.interface["port"], "12345")
# Test for unsupported type
interface.interface["type"] = 99
result = interface._set_default_port() # pylint: disable=protected-access
self.assertFalse(result)
def test_set_snmp_v2(self):
"""Test set_snmp with SNMPv2 configuration."""
interface = ZabbixInterface(self.snmpv2_context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp
interface.set_snmp()
# Check SNMP details
self.assertEqual(interface.interface["details"]["version"], "2")
self.assertEqual(interface.interface["details"]["community"], "public")
self.assertEqual(interface.interface["details"]["bulk"], "1")
def test_set_snmp_v3(self):
"""Test set_snmp with SNMPv3 configuration."""
interface = ZabbixInterface(self.snmpv3_context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp
interface.set_snmp()
# Check SNMP details
self.assertEqual(interface.interface["details"]["version"], "3")
self.assertEqual(interface.interface["details"]["securityname"], "snmpuser")
self.assertEqual(interface.interface["details"]["securitylevel"], "authPriv")
self.assertEqual(interface.interface["details"]["authprotocol"], "SHA")
self.assertEqual(interface.interface["details"]["authpassphrase"], "authpass123")
self.assertEqual(interface.interface["details"]["privprotocol"], "AES")
self.assertEqual(interface.interface["details"]["privpassphrase"], "privpass123")
self.assertEqual(interface.interface["details"]["contextname"], "context1")
def test_set_snmp_no_snmp_config(self):
"""Test set_snmp with missing SNMP configuration."""
# Create context with interface type but no SNMP config
context = {"zabbix": {"interface_type": 2}}
interface = ZabbixInterface(context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp - should raise exception
with self.assertRaises(InterfaceConfigError):
interface.set_snmp()
def test_set_snmp_unsupported_version(self):
"""Test set_snmp with unsupported SNMP version."""
# Create context with invalid SNMP version
context = {
"zabbix": {
"interface_type": 2,
"snmp": {
"version": 4 # Invalid version
}
}
}
interface = ZabbixInterface(context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp - should raise exception
with self.assertRaises(InterfaceConfigError):
interface.set_snmp()
def test_set_snmp_no_version(self):
"""Test set_snmp with missing SNMP version."""
# Create context without SNMP version
context = {
"zabbix": {
"interface_type": 2,
"snmp": {
"community": "public" # No version specified
}
}
}
interface = ZabbixInterface(context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp - should raise exception
with self.assertRaises(InterfaceConfigError):
interface.set_snmp()
def test_set_snmp_non_snmp_interface(self):
"""Test set_snmp with non-SNMP interface type."""
interface = ZabbixInterface(self.agent_context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp - should raise exception
with self.assertRaises(InterfaceConfigError):
interface.set_snmp()
def test_set_default_snmp(self):
"""Test set_default_snmp method."""
interface = ZabbixInterface(self.empty_context, self.test_ip)
interface.set_default_snmp()
# Check interface properties
self.assertEqual(interface.interface["type"], "2")
self.assertEqual(interface.interface["port"], "161")
self.assertEqual(interface.interface["details"]["version"], "2")
self.assertEqual(interface.interface["details"]["community"], "{$SNMP_COMMUNITY}")
self.assertEqual(interface.interface["details"]["bulk"], "1")
def test_set_default_agent(self):
"""Test set_default_agent method."""
interface = ZabbixInterface(self.empty_context, self.test_ip)
interface.set_default_agent()
# Check interface properties
self.assertEqual(interface.interface["type"], "1")
self.assertEqual(interface.interface["port"], "10050")
def test_snmpv2_no_community(self):
"""Test SNMPv2 with no community string specified."""
# Create context with SNMPv2 but no community
context = {
"zabbix": {
"interface_type": 2,
"snmp": {
"version": 2
}
}
}
interface = ZabbixInterface(context, self.test_ip)
interface.get_context() # Set the interface type
# Call set_snmp
interface.set_snmp()
# Should use default community string
self.assertEqual(interface.interface["details"]["community"], "{$SNMP_COMMUNITY}")

View File

@ -0,0 +1,429 @@
"""Tests for the PhysicalDevice class in the device module."""
import unittest
from unittest.mock import MagicMock, patch
from modules.device import PhysicalDevice
from modules.exceptions import TemplateError, SyncInventoryError
class TestPhysicalDevice(unittest.TestCase):
"""Test class for PhysicalDevice functionality."""
def setUp(self):
"""Set up test fixtures."""
# Create mock NetBox device
self.mock_nb_device = MagicMock()
self.mock_nb_device.id = 123
self.mock_nb_device.name = "test-device"
self.mock_nb_device.status.label = "Active"
self.mock_nb_device.custom_fields = {"zabbix_hostid": None}
self.mock_nb_device.config_context = {}
# Set up a primary IP
primary_ip = MagicMock()
primary_ip.address = "192.168.1.1/24"
self.mock_nb_device.primary_ip = primary_ip
# Create mock Zabbix API
self.mock_zabbix = MagicMock()
self.mock_zabbix.version = "6.0"
# Mock NetBox journal class
self.mock_nb_journal = MagicMock()
# Create logger mock
self.mock_logger = MagicMock()
# Create PhysicalDevice instance with mocks
with patch('modules.device.config',
{"device_cf": "zabbix_hostid",
"template_cf": "zabbix_template",
"templates_config_context": False,
"templates_config_context_overrule": False,
"traverse_regions": False,
"traverse_site_groups": False,
"inventory_mode": "disabled",
"inventory_sync": False,
"device_inventory_map": {}
}):
self.device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
journal=True,
logger=self.mock_logger
)
def test_init(self):
"""Test the initialization of the PhysicalDevice class."""
# Check that basic properties are set correctly
self.assertEqual(self.device.name, "test-device")
self.assertEqual(self.device.id, 123)
self.assertEqual(self.device.status, "Active")
self.assertEqual(self.device.ip, "192.168.1.1")
self.assertEqual(self.device.cidr, "192.168.1.1/24")
def test_init_no_primary_ip(self):
"""Test initialization when device has no primary IP."""
# Set primary_ip to None
self.mock_nb_device.primary_ip = None
# Creating device should raise SyncInventoryError
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
with self.assertRaises(SyncInventoryError):
PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
def test_set_basics_with_special_characters(self):
"""Test _setBasics when device name contains special characters."""
# Set name with special characters that
# will actually trigger the special character detection
self.mock_nb_device.name = "test-devïce"
# We need to patch the search function to simulate finding special characters
with patch('modules.device.search') as mock_search, \
patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
# Make the search function return True to simulate special characters
mock_search.return_value = True
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# With the mocked search function, the name should be changed to NETBOX_ID format
self.assertEqual(device.name, f"NETBOX_ID{self.mock_nb_device.id}")
# And visible_name should be set to the original name
self.assertEqual(device.visible_name, "test-devïce")
# use_visible_name flag should be set
self.assertTrue(device.use_visible_name)
def test_get_templates_context(self):
"""Test get_templates_context with valid config."""
# Set up config_context with valid template data
self.mock_nb_device.config_context = {
"zabbix": {
"templates": ["Template1", "Template2"]
}
}
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Test that templates are returned correctly
templates = device.get_templates_context()
self.assertEqual(templates, ["Template1", "Template2"])
def test_get_templates_context_with_string(self):
"""Test get_templates_context with a string instead of list."""
# Set up config_context with a string template
self.mock_nb_device.config_context = {
"zabbix": {
"templates": "Template1"
}
}
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Test that template is wrapped in a list
templates = device.get_templates_context()
self.assertEqual(templates, ["Template1"])
def test_get_templates_context_no_zabbix_key(self):
"""Test get_templates_context when zabbix key is missing."""
# Set up config_context without zabbix key
self.mock_nb_device.config_context = {}
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Test that TemplateError is raised
with self.assertRaises(TemplateError):
device.get_templates_context()
def test_get_templates_context_no_templates_key(self):
"""Test get_templates_context when templates key is missing."""
# Set up config_context without templates key
self.mock_nb_device.config_context = {"zabbix": {}}
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Test that TemplateError is raised
with self.assertRaises(TemplateError):
device.get_templates_context()
def test_set_template_with_config_context(self):
"""Test set_template with templates_config_context=True."""
# Set up config_context with templates
self.mock_nb_device.config_context = {
"zabbix": {
"templates": ["Template1"]
}
}
# Mock get_templates_context to return expected templates
with patch.object(PhysicalDevice, 'get_templates_context', return_value=["Template1"]):
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call set_template with prefer_config_context=True
result = device.set_template(prefer_config_context=True, overrule_custom=False)
# Check result and template names
self.assertTrue(result)
self.assertEqual(device.zbx_template_names, ["Template1"])
def test_set_inventory_disabled_mode(self):
"""Test set_inventory with inventory_mode=disabled."""
# Configure with disabled inventory mode
config_patch = {
"device_cf": "zabbix_hostid",
"inventory_mode": "disabled",
"inventory_sync": False
}
with patch('modules.device.config', config_patch):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call set_inventory with the config patch still active
with patch('modules.device.config', config_patch):
result = device.set_inventory({})
# Check result
self.assertTrue(result)
# Default value for disabled inventory
self.assertEqual(device.inventory_mode, -1)
def test_set_inventory_manual_mode(self):
"""Test set_inventory with inventory_mode=manual."""
# Configure with manual inventory mode
config_patch = {
"device_cf": "zabbix_hostid",
"inventory_mode": "manual",
"inventory_sync": False
}
with patch('modules.device.config', config_patch):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call set_inventory with the config patch still active
with patch('modules.device.config', config_patch):
result = device.set_inventory({})
# Check result
self.assertTrue(result)
self.assertEqual(device.inventory_mode, 0) # Manual mode
def test_set_inventory_automatic_mode(self):
"""Test set_inventory with inventory_mode=automatic."""
# Configure with automatic inventory mode
config_patch = {
"device_cf": "zabbix_hostid",
"inventory_mode": "automatic",
"inventory_sync": False
}
with patch('modules.device.config', config_patch):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call set_inventory with the config patch still active
with patch('modules.device.config', config_patch):
result = device.set_inventory({})
# Check result
self.assertTrue(result)
self.assertEqual(device.inventory_mode, 1) # Automatic mode
def test_set_inventory_with_inventory_sync(self):
"""Test set_inventory with inventory_sync=True."""
# Configure with inventory sync enabled
config_patch = {
"device_cf": "zabbix_hostid",
"inventory_mode": "manual",
"inventory_sync": True,
"device_inventory_map": {
"name": "name",
"serial": "serialno_a"
}
}
with patch('modules.device.config', config_patch):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Create a mock device with the required attributes
mock_device_data = {
"name": "test-device",
"serial": "ABC123"
}
# Call set_inventory with the config patch still active
with patch('modules.device.config', config_patch):
result = device.set_inventory(mock_device_data)
# Check result
self.assertTrue(result)
self.assertEqual(device.inventory_mode, 0) # Manual mode
self.assertEqual(device.inventory, {
"name": "test-device",
"serialno_a": "ABC123"
})
def test_iscluster_true(self):
"""Test isCluster when device is part of a cluster."""
# Set up virtual_chassis
self.mock_nb_device.virtual_chassis = MagicMock()
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Check isCluster result
self.assertTrue(device.isCluster())
def test_is_cluster_false(self):
"""Test isCluster when device is not part of a cluster."""
# Set virtual_chassis to None
self.mock_nb_device.virtual_chassis = None
# Create device with the updated mock
with patch('modules.device.config', {"device_cf": "zabbix_hostid"}):
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Check isCluster result
self.assertFalse(device.isCluster())
def test_promote_master_device_primary(self):
"""Test promoteMasterDevice when device is primary in cluster."""
# Set up virtual chassis with master device
mock_vc = MagicMock()
mock_vc.name = "virtual-chassis-1"
mock_master = MagicMock()
mock_master.id = self.mock_nb_device.id # Set master ID to match the current device
mock_vc.master = mock_master
self.mock_nb_device.virtual_chassis = mock_vc
# Create device with the updated mock
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call promoteMasterDevice and check the result
result = device.promoteMasterDevice()
# Should return True for primary device
self.assertTrue(result)
# Device name should be updated to virtual chassis name
self.assertEqual(device.name, "virtual-chassis-1")
def test_promote_master_device_secondary(self):
"""Test promoteMasterDevice when device is secondary in cluster."""
# Set up virtual chassis with a different master device
mock_vc = MagicMock()
mock_vc.name = "virtual-chassis-1"
mock_master = MagicMock()
mock_master.id = self.mock_nb_device.id + 1 # Different ID than the current device
mock_vc.master = mock_master
self.mock_nb_device.virtual_chassis = mock_vc
# Create device with the updated mock
device = PhysicalDevice(
self.mock_nb_device,
self.mock_zabbix,
self.mock_nb_journal,
"3.0",
logger=self.mock_logger
)
# Call promoteMasterDevice and check the result
result = device.promoteMasterDevice()
# Should return False for secondary device
self.assertFalse(result)
# Device name should not be modified
self.assertEqual(device.name, "test-device")

62
tests/test_tools.py Normal file
View File

@ -0,0 +1,62 @@
from modules.tools import sanatize_log_output
def test_sanatize_log_output_secrets():
data = {
"macros": [
{"macro": "{$SECRET}", "type": "1", "value": "supersecret"},
{"macro": "{$PLAIN}", "type": "0", "value": "notsecret"},
]
}
sanitized = sanatize_log_output(data)
assert sanitized["macros"][0]["value"] == "********"
assert sanitized["macros"][1]["value"] == "notsecret"
def test_sanatize_log_output_interface_secrets():
data = {
"interfaceid": 123,
"details": {
"authpassphrase": "supersecret",
"privpassphrase": "anothersecret",
"securityname": "sensitiveuser",
"community": "public",
"other": "normalvalue"
}
}
sanitized = sanatize_log_output(data)
# Sensitive fields should be sanitized
assert sanitized["details"]["authpassphrase"] == "********"
assert sanitized["details"]["privpassphrase"] == "********"
assert sanitized["details"]["securityname"] == "********"
# Non-sensitive fields should remain
assert sanitized["details"]["community"] == "********"
assert sanitized["details"]["other"] == "normalvalue"
# interfaceid should be removed
assert "interfaceid" not in sanitized
def test_sanatize_log_output_interface_macros():
data = {
"interfaceid": 123,
"details": {
"authpassphrase": "{$SECRET_MACRO}",
"privpassphrase": "{$SECRET_MACRO}",
"securityname": "{$USER_MACRO}",
"community": "{$SNNMP_COMMUNITY}",
}
}
sanitized = sanatize_log_output(data)
# Macro values should not be sanitized
assert sanitized["details"]["authpassphrase"] == "{$SECRET_MACRO}"
assert sanitized["details"]["privpassphrase"] == "{$SECRET_MACRO}"
assert sanitized["details"]["securityname"] == "{$USER_MACRO}"
assert sanitized["details"]["community"] == "{$SNNMP_COMMUNITY}"
assert "interfaceid" not in sanitized
def test_sanatize_log_output_plain_data():
data = {"foo": "bar", "baz": 123}
sanitized = sanatize_log_output(data)
assert sanitized == data
def test_sanatize_log_output_non_dict():
data = [1, 2, 3]
sanitized = sanatize_log_output(data)
assert sanitized == data

125
tests/test_usermacros.py Normal file
View File

@ -0,0 +1,125 @@
import unittest
from unittest.mock import MagicMock, patch
from modules.device import PhysicalDevice
from modules.usermacros import ZabbixUsermacros
class DummyNB:
def __init__(self, name="dummy", config_context=None, **kwargs):
self.name = name
self.config_context = config_context or {}
for k, v in kwargs.items():
setattr(self, k, v)
def __getitem__(self, key):
# Allow dict-style access for test compatibility
if hasattr(self, key):
return getattr(self, key)
if key in self.config_context:
return self.config_context[key]
raise KeyError(key)
class TestUsermacroSync(unittest.TestCase):
def setUp(self):
self.nb = DummyNB(serial="1234")
self.logger = MagicMock()
self.usermacro_map = {"serial": "{$HW_SERIAL}"}
@patch("modules.device.config", {"usermacro_sync": False})
def test_usermacro_sync_false(self):
device = PhysicalDevice.__new__(PhysicalDevice)
device.nb = self.nb
device.logger = self.logger
device.name = "dummy"
device._usermacro_map = MagicMock(return_value=self.usermacro_map)
# call set_usermacros
result = device.set_usermacros()
self.assertEqual(device.usermacros, [])
self.assertTrue(result is True or result is None)
@patch("modules.device.config", {"usermacro_sync": True})
def test_usermacro_sync_true(self):
device = PhysicalDevice.__new__(PhysicalDevice)
device.nb = self.nb
device.logger = self.logger
device.name = "dummy"
device._usermacro_map = MagicMock(return_value=self.usermacro_map)
result = device.set_usermacros()
self.assertIsInstance(device.usermacros, list)
self.assertGreater(len(device.usermacros), 0)
@patch("modules.device.config", {"usermacro_sync": "full"})
def test_usermacro_sync_full(self):
device = PhysicalDevice.__new__(PhysicalDevice)
device.nb = self.nb
device.logger = self.logger
device.name = "dummy"
device._usermacro_map = MagicMock(return_value=self.usermacro_map)
result = device.set_usermacros()
self.assertIsInstance(device.usermacros, list)
self.assertGreater(len(device.usermacros), 0)
class TestZabbixUsermacros(unittest.TestCase):
def setUp(self):
self.nb = DummyNB()
self.logger = MagicMock()
def test_validate_macro_valid(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
self.assertTrue(macros.validate_macro("{$TEST_MACRO}"))
self.assertTrue(macros.validate_macro("{$A1_2.3}"))
self.assertTrue(macros.validate_macro("{$FOO:bar}"))
def test_validate_macro_invalid(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
self.assertFalse(macros.validate_macro("$TEST_MACRO"))
self.assertFalse(macros.validate_macro("{TEST_MACRO}"))
self.assertFalse(macros.validate_macro("{$test}")) # lower-case not allowed
self.assertFalse(macros.validate_macro(""))
def test_render_macro_dict(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
macro = macros.render_macro("{$FOO}", {"value": "bar", "type": "secret", "description": "desc"})
self.assertEqual(macro["macro"], "{$FOO}")
self.assertEqual(macro["value"], "bar")
self.assertEqual(macro["type"], "1")
self.assertEqual(macro["description"], "desc")
def test_render_macro_dict_missing_value(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("{$FOO}", {"type": "text"})
self.assertFalse(result)
self.logger.warning.assert_called()
def test_render_macro_str(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
macro = macros.render_macro("{$FOO}", "bar")
self.assertEqual(macro["macro"], "{$FOO}")
self.assertEqual(macro["value"], "bar")
self.assertEqual(macro["type"], "0")
self.assertEqual(macro["description"], "")
def test_render_macro_invalid_name(self):
macros = ZabbixUsermacros(self.nb, {}, False, logger=self.logger)
result = macros.render_macro("FOO", "bar")
self.assertFalse(result)
self.logger.error.assert_called()
def test_generate_from_map(self):
nb = DummyNB(memory="bar", role="baz")
usermacro_map = {"memory": "{$FOO}", "role": "{$BAR}"}
macros = ZabbixUsermacros(nb, usermacro_map, True, logger=self.logger)
result = macros.generate()
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["macro"], "{$FOO}")
self.assertEqual(result[1]["macro"], "{$BAR}")
def test_generate_from_config_context(self):
config_context = {"zabbix": {"usermacros": {"{$FOO}": {"value": "bar"}}}}
nb = DummyNB(config_context=config_context)
macros = ZabbixUsermacros(nb, {}, True, logger=self.logger)
result = macros.generate()
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["macro"], "{$FOO}")
if __name__ == "__main__":
unittest.main()