diff --git a/modules/device.py b/modules/device.py index b8bda3f..95f9d4f 100644 --- a/modules/device.py +++ b/modules/device.py @@ -19,7 +19,7 @@ from modules.exceptions import ( from modules.hostgroups import Hostgroup from modules.interface import ZabbixInterface from modules.tags import ZabbixTags -from modules.tools import field_mapper, remove_duplicates +from modules.tools import field_mapper, remove_duplicates, sanatize_log_output from modules.usermacros import ZabbixUsermacros from modules.config import load_config @@ -594,7 +594,7 @@ class PhysicalDevice: ) self.logger.error(e) raise SyncExternalError(e) from None - self.logger.info(f"Updated host {self.name} with data {kwargs}.") + self.logger.info(f"Host {self.name}: updated with data {sanatize_log_output(kwargs)}.") self.create_journal_entry("info", "Updated host in Zabbix with latest NB data.") def ConsistencyCheck( @@ -854,7 +854,7 @@ class PhysicalDevice: try: # API call to Zabbix self.zabbix.hostinterface.update(updates) - e = f"Host {self.name}: solved interface conflict." + e = f"Host {self.name}: updated interface with data {sanatize_log_output(updates)}." self.logger.info(e) self.create_journal_entry("info", e) except APIRequestError as e: diff --git a/modules/tools.py b/modules/tools.py index 791025d..1b641f2 100644 --- a/modules/tools.py +++ b/modules/tools.py @@ -99,3 +99,39 @@ def remove_duplicates(input_list, sortkey=None): if sortkey and isinstance(sortkey, str): output_list.sort(key=lambda x: x[sortkey]) return output_list + +def sanatize_log_output(data): + """ + Used for the update function to Zabbix which + shows the data that its using to update the host. + Removes and sensitive data from the input. + """ + if not isinstance(data, dict): + return data + sanitized_data = data.copy() + # Check if there are any sensitive macros defined in the data + if "macros" in data: + for macro in sanitized_data["macros"]: + # Check if macro is secret type + if not macro["type"] == str(1): + continue + macro["value"] = "********" + # Check for interface data + if "interfaceid" in data: + # Interface tID is a value which is most likely not helpful + # in logging output or for roubleshooting. + del sanitized_data["interfaceid"] + # InterfaceID also hints that this is a interface update. + # A check is required if there are no macro's used for SNMP security parameters. + if not "details" in data: + return sanitized_data + for key, detail in sanitized_data["details"].items(): + # If the detail is a secret, we don't want to log it. + if key in ("authpassphrase", "privpassphrase", "securityname", "community"): + # Check if a macro is used. + # If so then logging the output is not a security issue. + if detail.startswith("{$") and detail.endswith("}"): + continue + # A macro is not used, so we sanitize the value. + sanitized_data["details"][key] = "********" + return sanitized_data diff --git a/tests/test_tools.py b/tests/test_tools.py new file mode 100644 index 0000000..3e6ae24 --- /dev/null +++ b/tests/test_tools.py @@ -0,0 +1,62 @@ +from modules.tools import sanatize_log_output + +def test_sanatize_log_output_secrets(): + data = { + "macros": [ + {"macro": "{$SECRET}", "type": "1", "value": "supersecret"}, + {"macro": "{$PLAIN}", "type": "0", "value": "notsecret"}, + ] + } + sanitized = sanatize_log_output(data) + assert sanitized["macros"][0]["value"] == "********" + assert sanitized["macros"][1]["value"] == "notsecret" + +def test_sanatize_log_output_interface_secrets(): + data = { + "interfaceid": 123, + "details": { + "authpassphrase": "supersecret", + "privpassphrase": "anothersecret", + "securityname": "sensitiveuser", + "community": "public", + "other": "normalvalue" + } + } + sanitized = sanatize_log_output(data) + # Sensitive fields should be sanitized + assert sanitized["details"]["authpassphrase"] == "********" + assert sanitized["details"]["privpassphrase"] == "********" + assert sanitized["details"]["securityname"] == "********" + # Non-sensitive fields should remain + assert sanitized["details"]["community"] == "********" + assert sanitized["details"]["other"] == "normalvalue" + # interfaceid should be removed + assert "interfaceid" not in sanitized + +def test_sanatize_log_output_interface_macros(): + data = { + "interfaceid": 123, + "details": { + "authpassphrase": "{$SECRET_MACRO}", + "privpassphrase": "{$SECRET_MACRO}", + "securityname": "{$USER_MACRO}", + "community": "{$SNNMP_COMMUNITY}", + } + } + sanitized = sanatize_log_output(data) + # Macro values should not be sanitized + assert sanitized["details"]["authpassphrase"] == "{$SECRET_MACRO}" + assert sanitized["details"]["privpassphrase"] == "{$SECRET_MACRO}" + assert sanitized["details"]["securityname"] == "{$USER_MACRO}" + assert sanitized["details"]["community"] == "{$SNNMP_COMMUNITY}" + assert "interfaceid" not in sanitized + +def test_sanatize_log_output_plain_data(): + data = {"foo": "bar", "baz": 123} + sanitized = sanatize_log_output(data) + assert sanitized == data + +def test_sanatize_log_output_non_dict(): + data = [1, 2, 3] + sanitized = sanatize_log_output(data) + assert sanitized == data