Compare commits

..

1 Commits

Author SHA1 Message Date
Jason Novinger
cedbeb7b19 Fixes #21176: Remove checkboxes from IP ranges in mixed-type tables
When IP addresses and IP ranges are displayed together in a prefix's
  IP Addresses tab, only IP addresses should be selectable for bulk
  operations since the bulk delete form doesn't support mixed object types.

  - Override render_pk() in AnnotatedIPAddressTable to conditionally render
    checkboxes only for the table's primary model type (IPAddress)
  - Add warning comment to add_requested_prefixes() about fake Prefix objects
  - Add regression test to verify IPAddress has checkboxes but IPRange does not
2026-01-23 09:36:15 -06:00
14 changed files with 127 additions and 166 deletions

View File

@@ -20,9 +20,7 @@ from utilities.forms.fields import (
DynamicModelChoiceField, DynamicModelMultipleChoiceField, JSONField, NumericArrayField, SlugField,
)
from utilities.forms.rendering import FieldSet, InlineFields, TabbedGroups
from utilities.forms.widgets import (
APISelect, ClearableFileInput, ClearableSelect, HTMXSelect, NumberWithOptions, SelectWithPK,
)
from utilities.forms.widgets import APISelect, ClearableFileInput, HTMXSelect, NumberWithOptions, SelectWithPK
from utilities.jsonschema import JSONSchemaProperty
from virtualization.models import Cluster, VMInterface
from wireless.models import WirelessLAN, WirelessLANGroup
@@ -594,14 +592,6 @@ class DeviceForm(TenancyForm, PrimaryModelForm):
},
)
)
face = forms.ChoiceField(
label=_('Face'),
choices=add_blank_choice(DeviceFaceChoices),
required=False,
widget=ClearableSelect(
requires_fields=['rack']
)
)
device_type = DynamicModelChoiceField(
label=_('Device type'),
queryset=DeviceType.objects.all(),

View File

@@ -1,5 +1,5 @@
import logging
from collections import UserDict, defaultdict
from collections import defaultdict
from django.conf import settings
from django.utils import timezone
@@ -12,6 +12,7 @@ from core.models import ObjectType
from netbox.config import get_config
from netbox.constants import RQ_QUEUE_DEFAULT
from netbox.models.features import has_feature
from users.models import User
from utilities.api import get_serializer_for_model
from utilities.request import copy_safe_request
from utilities.rqworker import get_rq_retry
@@ -22,19 +23,6 @@ from .models import EventRule
logger = logging.getLogger('netbox.events_processor')
class EventContext(UserDict):
"""
A custom dictionary that automatically serializes its associated object on demand.
"""
def __getitem__(self, item):
if item == 'data' and 'data' not in self:
data = serialize_for_event(self['object'])
self.__setitem__('data', data)
return data
return super().__getitem__(item)
def serialize_for_event(instance):
"""
Return a serialized representation of the given instance suitable for use in a queued event.
@@ -78,42 +66,37 @@ def enqueue_event(queue, instance, request, event_type):
assert instance.pk is not None
key = f'{app_label}.{model_name}:{instance.pk}'
if key in queue:
queue[key]['data'] = serialize_for_event(instance)
queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
# If the object is being deleted, update any prior "update" event to "delete"
if event_type == OBJECT_DELETED:
queue[key]['event_type'] = event_type
else:
queue[key] = EventContext(
object_type=ObjectType.objects.get_for_model(instance),
object_id=instance.pk,
object=instance,
event_type=event_type,
snapshots=get_snapshots(instance, event_type),
request=request,
user=request.user,
queue[key] = {
'object_type': ObjectType.objects.get_for_model(instance),
'object_id': instance.pk,
'event_type': event_type,
'data': serialize_for_event(instance),
'snapshots': get_snapshots(instance, event_type),
'request': request,
# Legacy request attributes for backward compatibility
username=request.user.username,
request_id=request.id,
)
# Force serialization of objects prior to them actually being deleted
if event_type == OBJECT_DELETED:
queue[key]['data'] = serialize_for_event(instance)
'username': request.user.username,
'request_id': request.id,
}
def process_event_rules(event_rules, object_type, event):
"""
Process a list of EventRules against an event.
"""
def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request=None):
user = User.objects.get(username=username) if username else None
for event_rule in event_rules:
# Evaluate event rule conditions (if any)
if not event_rule.eval_conditions(event['data']):
if not event_rule.eval_conditions(data):
continue
# Compile event data
event_data = event_rule.action_data or {}
event_data.update(event['data'])
event_data.update(data)
# Webhooks
if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
@@ -126,22 +109,25 @@ def process_event_rules(event_rules, object_type, event):
params = {
"event_rule": event_rule,
"object_type": object_type,
"event_type": event['event_type'],
"event_type": event_type,
"data": event_data,
"snapshots": event['snapshots'],
"snapshots": snapshots,
"timestamp": timezone.now().isoformat(),
"username": event['username'],
"username": username,
"retry": get_rq_retry()
}
if 'snapshots' in event:
params['snapshots'] = event['snapshots']
if 'request' in event:
if snapshots:
params["snapshots"] = snapshots
if request:
# Exclude FILES - webhooks don't need uploaded files,
# which can cause pickle errors with Pillow.
params['request'] = copy_safe_request(event['request'], include_files=False)
params["request"] = copy_safe_request(request, include_files=False)
# Enqueue the task
rq_queue.enqueue('extras.webhooks.send_webhook', **params)
rq_queue.enqueue(
"extras.webhooks.send_webhook",
**params
)
# Scripts
elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
@@ -153,16 +139,16 @@ def process_event_rules(event_rules, object_type, event):
params = {
"instance": event_rule.action_object,
"name": script.name,
"user": event['user'],
"user": user,
"data": event_data
}
if 'snapshots' in event:
params['snapshots'] = event['snapshots']
if 'request' in event:
params['request'] = copy_safe_request(event['request'])
# Enqueue the job
ScriptJob.enqueue(**params)
if snapshots:
params["snapshots"] = snapshots
if request:
params["request"] = copy_safe_request(request)
ScriptJob.enqueue(
**params
)
# Notification groups
elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
@@ -171,7 +157,7 @@ def process_event_rules(event_rules, object_type, event):
object_type=object_type,
object_id=event_data['id'],
object_repr=event_data.get('display'),
event_type=event['event_type']
event_type=event_type
)
else:
@@ -183,8 +169,6 @@ def process_event_rules(event_rules, object_type, event):
def process_event_queue(events):
"""
Flush a list of object representation to RQ for EventRule processing.
This is the default processor listed in EVENTS_PIPELINE.
"""
events_cache = defaultdict(dict)
@@ -204,7 +188,11 @@ def process_event_queue(events):
process_event_rules(
event_rules=event_rules,
object_type=object_type,
event=event,
event_type=event['event_type'],
data=event['data'],
username=event['username'],
snapshots=event['snapshots'],
request=event['request'],
)

View File

@@ -4,7 +4,7 @@ from django.dispatch import receiver
from core.events import *
from core.signals import job_end, job_start
from extras.events import EventContext, process_event_rules
from extras.events import process_event_rules
from extras.models import EventRule, Notification, Subscription
from netbox.config import get_config
from netbox.models.features import has_feature
@@ -102,12 +102,14 @@ def process_job_start_event_rules(sender, **kwargs):
enabled=True,
object_types=sender.object_type
)
event = EventContext(
username = sender.user.username if sender.user else None
process_event_rules(
event_rules=event_rules,
object_type=sender.object_type,
event_type=JOB_STARTED,
data=sender.data,
user=sender.user,
username=username
)
process_event_rules(event_rules, sender.object_type, event)
@receiver(job_end)
@@ -120,12 +122,14 @@ def process_job_end_event_rules(sender, **kwargs):
enabled=True,
object_types=sender.object_type
)
event = EventContext(
username = sender.user.username if sender.user else None
process_event_rules(
event_rules=event_rules,
object_type=sender.object_type,
event_type=JOB_COMPLETED,
data=sender.data,
user=sender.user,
username=username
)
process_event_rules(event_rules, sender.object_type, event)
#

View File

@@ -43,7 +43,7 @@ IMAGEATTACHMENT_IMAGE = """
<a href="{{ record.image.url }}" target="_blank" class="image-preview" data-bs-placement="top">
<i class="mdi mdi-image"></i></a>
{% endif %}
<a href="{{ record.get_absolute_url }}">{{ record.filename|truncate_middle:16 }}</a>
<a href="{{ record.get_absolute_url }}">{{ record }}</a>
"""
NOTIFICATION_ICON = """

View File

@@ -370,6 +370,11 @@ class AnnotatedIPAddressTable(IPAddressTable):
verbose_name=_('IP Address')
)
def render_pk(self, value, record, bound_column):
if type(record) is not self._meta.model:
return ''
return bound_column.column.render(value, bound_column, record)
class Meta(IPAddressTable.Meta):
pass

View File

@@ -0,0 +1,41 @@
from django.test import RequestFactory, TestCase
from netaddr import IPNetwork
from ipam.models import IPAddress, IPRange, Prefix
from ipam.tables import AnnotatedIPAddressTable
from ipam.utils import annotate_ip_space
class AnnotatedIPAddressTableTest(TestCase):
@classmethod
def setUpTestData(cls):
cls.prefix = Prefix.objects.create(
prefix=IPNetwork('10.1.1.0/24'),
status='active'
)
cls.ip_address = IPAddress.objects.create(
address='10.1.1.1/24',
status='active'
)
cls.ip_range = IPRange.objects.create(
start_address=IPNetwork('10.1.1.2/24'),
end_address=IPNetwork('10.1.1.10/24'),
status='active'
)
def test_ipaddress_has_checkbox_iprange_does_not(self):
data = annotate_ip_space(self.prefix)
table = AnnotatedIPAddressTable(data, orderable=False)
table.columns.show('pk')
request = RequestFactory().get('/')
html = table.as_html(request)
ipaddress_checkbox_count = html.count(f'name="pk" value="{self.ip_address.pk}"')
self.assertEqual(ipaddress_checkbox_count, 1)
iprange_checkbox_count = html.count(f'name="pk" value="{self.ip_range.pk}"')
self.assertEqual(iprange_checkbox_count, 0)

View File

@@ -49,6 +49,9 @@ def add_requested_prefixes(parent, prefix_list, show_available=True, show_assign
if prefix_list and show_available:
# Find all unallocated space, add fake Prefix objects to child_prefixes.
# IMPORTANT: These are unsaved Prefix instances (pk=None). If this is ever changed to use
# saved Prefix instances with real pks, bulk delete will fail for mixed-type selections
# due to single-model form validation. See: https://github.com/netbox-community/netbox/issues/21176
available_prefixes = netaddr.IPSet(parent) ^ netaddr.IPSet([p.prefix for p in prefix_list])
available_prefixes = [Prefix(prefix=p, status=None) for p in available_prefixes.iter_cidrs()]
child_prefixes = child_prefixes + available_prefixes

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -1,40 +0,0 @@
import TomSelect from 'tom-select';
import { getElements } from '../util';
/**
* Initialize clear-field dependencies.
* When a required field is cleared, dependent fields with data-requires-fields attribute will also be cleared.
*/
export function initClearField(): void {
// Find all fields with data-requires-fields attribute
for (const field of getElements<HTMLSelectElement>('[data-requires-fields]')) {
const requiredFieldsAttr = field.getAttribute('data-requires-fields');
if (!requiredFieldsAttr) continue;
// Parse the comma-separated list of required field names
const requiredFields = requiredFieldsAttr.split(',').map(name => name.trim());
// Set up listeners for each required field
for (const requiredFieldName of requiredFields) {
const requiredField = document.querySelector<HTMLSelectElement>(
`[name="${requiredFieldName}"]`,
);
if (!requiredField) continue;
// Listen for changes on the required field
requiredField.addEventListener('change', () => {
// If required field is cleared, also clear this dependent field
if (!requiredField.value || requiredField.value === '') {
// Check if this field uses TomSelect
const tomselect = (field as HTMLSelectElement & { tomselect?: TomSelect }).tomselect;
if (tomselect) {
tomselect.clear();
} else {
// Regular select field
field.value = '';
}
}
});
}
}
}

View File

@@ -1,10 +1,9 @@
import { initClearField } from './clearField';
import { initFormElements } from './elements';
import { initFilterModifiers } from './filterModifiers';
import { initSpeedSelector } from './speedSelector';
export function initForms(): void {
for (const func of [initFormElements, initSpeedSelector, initFilterModifiers, initClearField]) {
for (const func of [initFormElements, initSpeedSelector, initFilterModifiers]) {
func();
}
}

View File

@@ -8,7 +8,7 @@ msgid ""
msgstr ""
"Project-Id-Version: PACKAGE VERSION\n"
"Report-Msgid-Bugs-To: \n"
"POT-Creation-Date: 2026-01-22 05:07+0000\n"
"POT-Creation-Date: 2026-01-21 05:07+0000\n"
"PO-Revision-Date: YEAR-MO-DA HO:MI+ZONE\n"
"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
"Language-Team: LANGUAGE <LL@li.org>\n"
@@ -12037,7 +12037,7 @@ msgstr ""
msgid "date synced"
msgstr ""
#: netbox/netbox/models/features.py:621
#: netbox/netbox/models/features.py:623
#, python-brace-format
msgid "{class_name} must implement a sync_data() method."
msgstr ""
@@ -13935,8 +13935,8 @@ msgid "No VLANs Assigned"
msgstr ""
#: netbox/templates/dcim/inc/interface_vlans_table.html:44
#: netbox/templates/ipam/inc/max_depth.html:11
#: netbox/templates/ipam/inc/max_length.html:11
#: netbox/templates/ipam/prefix_list.html:16
#: netbox/templates/ipam/prefix_list.html:33
msgid "Clear"
msgstr ""
@@ -15053,8 +15053,8 @@ msgstr ""
msgid "Date Added"
msgstr ""
#: netbox/templates/ipam/aggregate/prefixes.html:10
#: netbox/templates/ipam/prefix/prefixes.html:10
#: netbox/templates/ipam/aggregate/prefixes.html:8
#: netbox/templates/ipam/prefix/prefixes.html:8
#: netbox/templates/ipam/role.html:10
msgid "Add Prefix"
msgstr ""
@@ -15083,14 +15083,6 @@ msgstr ""
msgid "Bulk Create"
msgstr ""
#: netbox/templates/ipam/inc/max_depth.html:6
msgid "Max Depth"
msgstr ""
#: netbox/templates/ipam/inc/max_length.html:6
msgid "Max Length"
msgstr ""
#: netbox/templates/ipam/inc/panels/fhrp_groups.html:10
msgid "Create Group"
msgstr ""
@@ -15192,6 +15184,14 @@ msgstr ""
msgid "Hide Depth Indicators"
msgstr ""
#: netbox/templates/ipam/prefix_list.html:11
msgid "Max Depth"
msgstr ""
#: netbox/templates/ipam/prefix_list.html:28
msgid "Max Length"
msgstr ""
#: netbox/templates/ipam/rir.html:10
msgid "Add Aggregate"
msgstr ""

View File

@@ -5,7 +5,6 @@ from ..utils import add_blank_choice
__all__ = (
'BulkEditNullBooleanSelect',
'ClearableSelect',
'ColorSelect',
'HTMXSelect',
'SelectWithPK',
@@ -29,21 +28,6 @@ class BulkEditNullBooleanSelect(forms.NullBooleanSelect):
)
class ClearableSelect(forms.Select):
"""
A Select widget that will be automatically cleared when one or more required fields are cleared.
Args:
requires_fields: A list of field names that this field depends on. When any of these fields
are cleared, this field will also be cleared automatically via JavaScript.
"""
def __init__(self, *args, requires_fields=None, **kwargs):
super().__init__(*args, **kwargs)
if requires_fields:
self.attrs['data-requires-fields'] = ','.join(requires_fields)
class ColorSelect(forms.Select):
"""
Extends the built-in Select widget to colorize each <option>.

View File

@@ -252,16 +252,3 @@ def isodatetime(value, spec='seconds'):
else:
return ''
return mark_safe(f'<span title="{naturaltime(value)}">{text}</span>')
@register.filter
def truncate_middle(value, length):
if len(value) <= length:
return value
# Calculate split points for the two parts
half_len = (length - 1) // 2 # 1 for the ellipsis
first_part = value[:half_len]
second_part = value[len(value) - (length - 1 - half_len):]
return mark_safe(f"{first_part}&hellip;{second_part}")