Merge branch 'main' into feature
CI / build (20.x, 3.12) (push) Has been cancelled
CI / build (20.x, 3.13) (push) Has been cancelled
CodeQL / Analyze (${{ matrix.language }}) (none, actions) (push) Has been cancelled
CodeQL / Analyze (${{ matrix.language }}) (none, javascript-typescript) (push) Has been cancelled
CodeQL / Analyze (${{ matrix.language }}) (none, python) (push) Has been cancelled

This commit is contained in:
Jeremy Stretch
2025-10-29 13:47:01 -04:00
107 changed files with 12674 additions and 10835 deletions
+24 -3
View File
@@ -5,6 +5,7 @@ from rest_framework import serializers
from core.api.serializers_.jobs import JobSerializer
from extras.models import Script
from netbox.api.serializers import ValidatedModelSerializer
from utilities.datetime import local_now
__all__ = (
'ScriptDetailSerializer',
@@ -66,11 +67,31 @@ class ScriptInputSerializer(serializers.Serializer):
interval = serializers.IntegerField(required=False, allow_null=True)
def validate_schedule_at(self, value):
if value and not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this script."))
"""
Validates the specified schedule time for a script execution.
"""
if value:
if not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_('Scheduling is not enabled for this script.'))
if value < local_now():
raise serializers.ValidationError(_('Scheduled time must be in the future.'))
return value
def validate_interval(self, value):
"""
Validates the provided interval based on the script's scheduling configuration.
"""
if value and not self.context['script'].python_class.scheduling_enabled:
raise serializers.ValidationError(_("Scheduling is not enabled for this script."))
raise serializers.ValidationError(_('Scheduling is not enabled for this script.'))
return value
def validate(self, data):
"""
Validates the given data and ensures the necessary fields are populated.
"""
# Set the schedule_at time to now if only an interval is provided
# while handling the case where schedule_at is null.
if data.get('interval') and not data.get('schedule_at'):
data['schedule_at'] = local_now()
return super().validate(data)
+16
View File
@@ -536,6 +536,15 @@ class CustomField(CloningMixin, ExportTemplatesMixin, OwnerMixin, ChangeLoggedMo
# URL
elif self.type == CustomFieldTypeChoices.TYPE_URL:
field = LaxURLField(assume_scheme='https', required=required, initial=initial)
if self.validation_regex:
field.validators = [
RegexValidator(
regex=self.validation_regex,
message=mark_safe(_("Values must match this regex: <code>{regex}</code>").format(
regex=escape(self.validation_regex)
))
)
]
# JSON
elif self.type == CustomFieldTypeChoices.TYPE_JSON:
@@ -685,6 +694,13 @@ class CustomField(CloningMixin, ExportTemplatesMixin, OwnerMixin, ChangeLoggedMo
if self.validation_regex and not re.match(self.validation_regex, value):
raise ValidationError(_("Value must match regex '{regex}'").format(regex=self.validation_regex))
# Validate URL field
elif self.type == CustomFieldTypeChoices.TYPE_URL:
if type(value) is not str:
raise ValidationError(_("Value must be a string."))
if self.validation_regex and not re.match(self.validation_regex, value):
raise ValidationError(_("Value must match regex '{regex}'").format(regex=self.validation_regex))
# Validate integer
elif self.type == CustomFieldTypeChoices.TYPE_INTEGER:
if type(value) is not int:
+79 -7
View File
@@ -901,16 +901,16 @@ class ConfigTemplateTest(APIViewTestCases.APIViewTestCase):
class ScriptTest(APITestCase):
class TestScriptClass(PythonClass):
class Meta:
name = "Test script"
name = 'Test script'
commit = True
scheduling_enabled = True
var1 = StringVar()
var2 = IntegerVar()
var3 = BooleanVar()
def run(self, data, commit=True):
self.log_info(data['var1'])
self.log_success(data['var2'])
self.log_failure(data['var3'])
@@ -921,14 +921,16 @@ class ScriptTest(APITestCase):
def setUpTestData(cls):
module = ScriptModule.objects.create(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
file_path='script.py',
)
Script.objects.create(
script = Script.objects.create(
module=module,
name="Test script",
name='Test script',
is_executable=True,
)
cls.url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
@property
def python_class(self):
return self.TestScriptClass
@@ -941,7 +943,7 @@ class ScriptTest(APITestCase):
def test_get_script(self):
module = ScriptModule.objects.get(
file_root=ManagedFileRootPathChoices.SCRIPTS,
file_path='/var/tmp/script.py'
file_path='script.py',
)
script = module.scripts.all().first()
url = reverse('extras-api:script-detail', kwargs={'pk': script.pk})
@@ -952,6 +954,76 @@ class ScriptTest(APITestCase):
self.assertEqual(response.data['vars']['var2'], 'IntegerVar')
self.assertEqual(response.data['vars']['var3'], 'BooleanVar')
def test_schedule_script_past_time_rejected(self):
"""
Scheduling with past schedule_at should fail.
"""
self.add_permissions('extras.run_script')
payload = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
'schedule_at': now() - datetime.timedelta(hours=1),
}
response = self.client.post(self.url, payload, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
self.assertIn('schedule_at', response.data)
# Be tolerant of exact wording but ensure we failed on schedule_at being in the past
self.assertIn('future', str(response.data['schedule_at']).lower())
def test_schedule_script_interval_only(self):
"""
Interval without schedule_at should auto-set schedule_at now.
"""
self.add_permissions('extras.run_script')
payload = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
'interval': 60,
}
response = self.client.post(self.url, payload, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
# The latest job is returned in the script detail serializer under "result"
self.assertIn('result', response.data)
self.assertEqual(response.data['result']['interval'], 60)
# Ensure a start time was autopopulated
self.assertIsNotNone(response.data['result']['scheduled'])
def test_schedule_script_when_disabled(self):
"""
Scheduling should fail when script.scheduling_enabled=False.
"""
self.add_permissions('extras.run_script')
# Temporarily disable scheduling on the in-test Python class
original = getattr(self.TestScriptClass.Meta, 'scheduling_enabled', True)
self.TestScriptClass.Meta.scheduling_enabled = False
base = {
'data': {'var1': 'hello', 'var2': 1, 'var3': False},
'commit': True,
}
# Check both schedule_at and interval paths
cases = [
{**base, 'schedule_at': now() + datetime.timedelta(minutes=5)},
{**base, 'interval': 60},
]
try:
for case in cases:
with self.subTest(case=list(case.keys())):
response = self.client.post(self.url, case, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
# Error should be attached to whichever field we used
key = 'schedule_at' if 'schedule_at' in case else 'interval'
self.assertIn(key, response.data)
self.assertIn('scheduling is not enabled', str(response.data[key]).lower())
finally:
# Restore the original setting for other tests
self.TestScriptClass.Meta.scheduling_enabled = original
class CreatedUpdatedFilterTest(APITestCase):
+22
View File
@@ -1300,6 +1300,28 @@ class CustomFieldAPITest(APITestCase):
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
def test_url_regex_validation(self):
"""
Test that validation_regex is applied to URL custom fields (fixes #20498).
"""
site2 = Site.objects.get(name='Site 2')
url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
self.add_permissions('dcim.change_site')
cf_url = CustomField.objects.get(name='url_field')
cf_url.validation_regex = r'^https://' # Require HTTPS
cf_url.save()
# Test invalid URL (http instead of https)
data = {'custom_fields': {'url_field': 'http://example.com'}}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
# Test valid URL (https)
data = {'custom_fields': {'url_field': 'https://example.com'}}
response = self.client.patch(url, data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
def test_uniqueness_validation(self):
# Create a unique custom field
cf_text = CustomField.objects.get(name='text_field')
+70
View File
@@ -1,11 +1,14 @@
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse
from django.test import tag
from core.choices import ManagedFileRootPathChoices
from core.events import *
from core.models import ObjectType
from dcim.models import DeviceType, Manufacturer, Site
from extras.choices import *
from extras.models import *
from extras.scripts import Script as PythonClass, IntegerVar, BooleanVar
from users.models import Group, User
from utilities.testing import ViewTestCases, TestCase
@@ -897,3 +900,70 @@ class ScriptListViewTest(TestCase):
response = self.client.get(url, {'embedded': 'true'})
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'extras/inc/script_list_content.html')
class ScriptValidationErrorTest(TestCase):
user_permissions = ['extras.view_script', 'extras.run_script']
class TestScriptMixin:
bar = IntegerVar(min_value=0, max_value=30, default=30)
class TestScriptClass(TestScriptMixin, PythonClass):
class Meta:
name = 'Test script'
commit_default = False
fieldsets = (("Logging", ("debug_mode",)),)
debug_mode = BooleanVar(default=False)
def run(self, data, commit):
return "Complete"
@classmethod
def setUpTestData(cls):
module = ScriptModule.objects.create(file_root=ManagedFileRootPathChoices.SCRIPTS, file_path='test_script.py')
cls.script = Script.objects.create(module=module, name='Test script', is_executable=True)
def setUp(self):
super().setUp()
Script.python_class = property(lambda self: ScriptValidationErrorTest.TestScriptClass)
@tag('regression')
def test_script_validation_error_displays_message(self):
from unittest.mock import patch
url = reverse('extras:script', kwargs={'pk': self.script.pk})
with patch('extras.views.get_workers_for_queue', return_value=['worker']):
response = self.client.post(url, {'debug_mode': 'true', '_commit': 'true'})
self.assertEqual(response.status_code, 200)
messages = list(response.context['messages'])
self.assertEqual(len(messages), 1)
self.assertEqual(str(messages[0]), "bar: This field is required.")
@tag('regression')
def test_script_validation_error_no_toast_for_fieldset_fields(self):
from unittest.mock import patch, PropertyMock
class FieldsetScript(PythonClass):
class Meta:
name = 'Fieldset test'
commit_default = False
fieldsets = (("Fields", ("required_field",)),)
required_field = IntegerVar(min_value=10)
def run(self, data, commit):
return "Complete"
url = reverse('extras:script', kwargs={'pk': self.script.pk})
with patch.object(Script, 'python_class', new_callable=PropertyMock) as mock_python_class:
mock_python_class.return_value = FieldsetScript
with patch('extras.views.get_workers_for_queue', return_value=['worker']):
response = self.client.post(url, {'required_field': '5', '_commit': 'true'})
self.assertEqual(response.status_code, 200)
messages = list(response.context['messages'])
self.assertEqual(len(messages), 0)
+57 -3
View File
@@ -4,7 +4,7 @@ from django.contrib.auth.mixins import LoginRequiredMixin
from django.contrib.contenttypes.models import ContentType
from django.core.paginator import EmptyPage
from django.db.models import Count, Q
from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse
from django.http import HttpResponseBadRequest, HttpResponseForbidden, HttpResponse, Http404
from django.shortcuts import get_object_or_404, redirect, render
from django.urls import reverse
from django.utils import timezone
@@ -25,7 +25,7 @@ from netbox.object_actions import *
from netbox.views import generic
from netbox.views.generic.mixins import TableMixin
from utilities.forms import ConfirmationForm, get_field_value
from utilities.htmx import htmx_partial
from utilities.htmx import htmx_partial, htmx_maybe_redirect_current_page
from utilities.paginator import EnhancedPaginator, get_paginate_count
from utilities.query import count_related
from utilities.querydict import normalize_querydict
@@ -101,6 +101,7 @@ class CustomFieldBulkEditView(generic.BulkEditView):
@register_model_view(CustomField, 'bulk_rename', path='rename', detail=False)
class CustomFieldBulkRenameView(generic.BulkRenameView):
queryset = CustomField.objects.all()
filterset = filtersets.CustomFieldFilterSet
@register_model_view(CustomField, 'bulk_delete', path='delete', detail=False)
@@ -175,6 +176,7 @@ class CustomFieldChoiceSetBulkEditView(generic.BulkEditView):
@register_model_view(CustomFieldChoiceSet, 'bulk_rename', path='rename', detail=False)
class CustomFieldChoiceSetBulkRenameView(generic.BulkRenameView):
queryset = CustomFieldChoiceSet.objects.all()
filterset = filtersets.CustomFieldChoiceSetFilterSet
@register_model_view(CustomFieldChoiceSet, 'bulk_delete', path='delete', detail=False)
@@ -230,6 +232,7 @@ class CustomLinkBulkEditView(generic.BulkEditView):
@register_model_view(CustomLink, 'bulk_rename', path='rename', detail=False)
class CustomLinkBulkRenameView(generic.BulkRenameView):
queryset = CustomLink.objects.all()
filterset = filtersets.CustomLinkFilterSet
@register_model_view(CustomLink, 'bulk_delete', path='delete', detail=False)
@@ -286,6 +289,7 @@ class ExportTemplateBulkEditView(generic.BulkEditView):
@register_model_view(ExportTemplate, 'bulk_rename', path='rename', detail=False)
class ExportTemplateBulkRenameView(generic.BulkRenameView):
queryset = ExportTemplate.objects.all()
filterset = filtersets.ExportTemplateFilterSet
@register_model_view(ExportTemplate, 'bulk_delete', path='delete', detail=False)
@@ -351,6 +355,7 @@ class SavedFilterBulkEditView(SharedObjectViewMixin, generic.BulkEditView):
@register_model_view(SavedFilter, 'bulk_rename', path='rename', detail=False)
class SavedFilterBulkRenameView(generic.BulkRenameView):
queryset = SavedFilter.objects.all()
filterset = filtersets.SavedFilterFilterSet
@register_model_view(SavedFilter, 'bulk_delete', path='delete', detail=False)
@@ -413,6 +418,7 @@ class TableConfigBulkEditView(SharedObjectViewMixin, generic.BulkEditView):
@register_model_view(TableConfig, 'bulk_rename', path='rename', detail=False)
class TableConfigBulkRenameView(generic.BulkRenameView):
queryset = TableConfig.objects.all()
filterset = filtersets.TableConfigFilterSet
@register_model_view(TableConfig, 'bulk_delete', path='delete', detail=False)
@@ -499,6 +505,7 @@ class NotificationGroupBulkEditView(generic.BulkEditView):
@register_model_view(NotificationGroup, 'bulk_rename', path='rename', detail=False)
class NotificationGroupBulkRenameView(generic.BulkRenameView):
queryset = NotificationGroup.objects.all()
filterset = filtersets.NotificationGroupFilterSet
@register_model_view(NotificationGroup, 'bulk_delete', path='delete', detail=False)
@@ -518,8 +525,9 @@ class NotificationsView(LoginRequiredMixin, View):
"""
def get(self, request):
return render(request, 'htmx/notifications.html', {
'notifications': request.user.notifications.unread(),
'notifications': request.user.notifications.unread()[:10],
'total_count': request.user.notifications.count(),
'unread_count': request.user.notifications.unread().count(),
})
@@ -528,6 +536,7 @@ class NotificationReadView(LoginRequiredMixin, View):
"""
Mark the Notification read and redirect the user to its attached object.
"""
def get(self, request, pk):
# Mark the Notification as read
notification = get_object_or_404(request.user.notifications, pk=pk)
@@ -541,18 +550,48 @@ class NotificationReadView(LoginRequiredMixin, View):
return redirect('account:notifications')
@register_model_view(Notification, name='dismiss_all', path='dismiss-all', detail=False)
class NotificationDismissAllView(LoginRequiredMixin, View):
"""
Convenience view to clear all *unread* notifications for the current user.
"""
def get(self, request):
request.user.notifications.unread().delete()
if htmx_partial(request):
# If a user is currently on the notification page, redirect there (full repaint)
redirect_resp = htmx_maybe_redirect_current_page(request, 'account:notifications', preserve_query=True)
if redirect_resp:
return redirect_resp
return render(request, 'htmx/notifications.html', {
'notifications': request.user.notifications.unread()[:10],
'total_count': request.user.notifications.count(),
'unread_count': request.user.notifications.unread().count(),
})
return redirect('account:notifications')
@register_model_view(Notification, 'dismiss')
class NotificationDismissView(LoginRequiredMixin, View):
"""
A convenience view which allows deleting notifications with one click.
"""
def get(self, request, pk):
notification = get_object_or_404(request.user.notifications, pk=pk)
notification.delete()
if htmx_partial(request):
# If a user is currently on the notification page, redirect there (full repaint)
redirect_resp = htmx_maybe_redirect_current_page(request, 'account:notifications', preserve_query=True)
if redirect_resp:
return redirect_resp
return render(request, 'htmx/notifications.html', {
'notifications': request.user.notifications.unread()[:10],
'total_count': request.user.notifications.count(),
'unread_count': request.user.notifications.unread().count(),
})
return redirect('account:notifications')
@@ -650,6 +689,7 @@ class WebhookBulkEditView(generic.BulkEditView):
@register_model_view(Webhook, 'bulk_rename', path='rename', detail=False)
class WebhookBulkRenameView(generic.BulkRenameView):
queryset = Webhook.objects.all()
filterset = filtersets.WebhookFilterSet
@register_model_view(Webhook, 'bulk_delete', path='delete', detail=False)
@@ -705,6 +745,7 @@ class EventRuleBulkEditView(generic.BulkEditView):
@register_model_view(EventRule, 'bulk_rename', path='rename', detail=False)
class EventRuleBulkRenameView(generic.BulkRenameView):
queryset = EventRule.objects.all()
filterset = filtersets.EventRuleFilterSet
@register_model_view(EventRule, 'bulk_delete', path='delete', detail=False)
@@ -841,6 +882,7 @@ class ConfigContextProfileBulkEditView(generic.BulkEditView):
@register_model_view(ConfigContextProfile, 'bulk_rename', path='rename', detail=False)
class ConfigContextProfileBulkRenameView(generic.BulkRenameView):
queryset = ConfigContextProfile.objects.all()
filterset = filtersets.ConfigContextProfileFilterSet
@register_model_view(ConfigContextProfile, 'bulk_delete', path='delete', detail=False)
@@ -929,6 +971,7 @@ class ConfigContextBulkEditView(generic.BulkEditView):
@register_model_view(ConfigContext, 'bulk_rename', path='rename', detail=False)
class ConfigContextBulkRenameView(generic.BulkRenameView):
queryset = ConfigContext.objects.all()
filterset = filtersets.ConfigContextFilterSet
@register_model_view(ConfigContext, 'bulk_delete', path='delete', detail=False)
@@ -1020,6 +1063,7 @@ class ConfigTemplateBulkEditView(generic.BulkEditView):
@register_model_view(ConfigTemplate, 'bulk_rename', path='rename', detail=False)
class ConfigTemplateBulkRenameView(generic.BulkRenameView):
queryset = ConfigTemplate.objects.all()
filterset = filtersets.ConfigTemplateFilterSet
@register_model_view(ConfigTemplate, 'bulk_delete', path='delete', detail=False)
@@ -1143,6 +1187,7 @@ class ImageAttachmentBulkEditView(generic.BulkEditView):
@register_model_view(ImageAttachment, 'bulk_rename', path='rename', detail=False)
class ImageAttachmentBulkRenameView(generic.BulkRenameView):
queryset = ImageAttachment.objects.all()
filterset = filtersets.ImageAttachmentFilterSet
@register_model_view(ImageAttachment, 'bulk_delete', path='delete', detail=False)
@@ -1485,6 +1530,15 @@ class ScriptView(BaseScriptView):
)
return redirect('extras:script_result', job_pk=job.pk)
else:
fieldset_fields = {field for _, fields in script_class.get_fieldsets() for field in fields}
hidden_errors = {
field: errors for field, errors in form.errors.items()
if field not in fieldset_fields
}
if hidden_errors:
error_msg = '; '.join(f"{field}: {', '.join(errors)}" for field, errors in hidden_errors.items())
messages.error(request, error_msg)
return render(request, 'extras/script.html', {
'object': script,