Merge branch 'feature' into 18423-django-storages

This commit is contained in:
Arthur 2025-03-04 11:00:51 -08:00
commit 65e6e3c87a
82 changed files with 1051 additions and 778 deletions

View File

@ -64,7 +64,7 @@ Email is sent from NetBox only for critical events or if configured for [logging
## HTTP_PROXIES
Default: None
Default: Empty
A dictionary of HTTP proxies to use for outbound requests originating from NetBox (e.g. when sending webhook requests). Proxies should be specified by schema (HTTP and HTTPS) as per the [Python requests library documentation](https://requests.readthedocs.io/en/latest/user/advanced/#proxies). For example:
@ -75,6 +75,8 @@ HTTP_PROXIES = {
}
```
If more flexibility is needed in determining which proxy to use for a given request, consider implementing one or more custom proxy routers via the [`PROXY_ROUTERS`](#proxy_routers) parameter.
---
## INTERNAL_IPS
@ -160,6 +162,16 @@ The file path to the location where media files (such as image attachments) are
---
## PROXY_ROUTERS
Default: `["utilities.proxy.DefaultProxyRouter"]`
A list of Python classes responsible for determining which proxy server(s) to use for outbound HTTP requests. Each item in the list can be the class itself or the dotted path to the class.
The `route()` method on each class must return a dictionary of candidate proxies arranged by protocol (e.g. `http` and/or `https`), or None if no viable proxy can be determined. The default class, `DefaultProxyRouter`, simply returns the content of [`HTTP_PROXIES`](#http_proxies).
---
## REPORTS_ROOT
Default: `$INSTALL_ROOT/netbox/reports/`

View File

@ -44,6 +44,12 @@ A set of rules (one per line) identifying filenames to ignore during synchroniza
| `*.txt` | Ignore any files with a `.txt` extension |
| `data???.json` | Ignore e.g. `data123.json` |
### Sync Interval
!!! info "This field was introduced in NetBox v4.3."
The interval at which the data source should automatically synchronize. If not set, the data source must be synchronized manually.
### Last Synced
The date and time at which the source was most recently synchronized successfully.

View File

@ -29,6 +29,19 @@ An alternative physical label identifying the power outlet.
The type of power outlet.
### Status
The operational status of the power outlet. By default, the following statuses are available:
* Enabled
* Disabled
* Faulty
!!! tip "Custom power outlet statuses"
Additional power outlet statuses may be defined by setting `PowerOutlet.status` under the [`FIELD_CHOICES`](../../configuration/data-validation.md#field_choices) configuration parameter.
!!! info "This field was introduced in NetBox v4.3."
### Color
!!! info "This field was introduced in NetBox v4.2."

View File

@ -1,16 +0,0 @@
# Branches
!!! danger "Deprecated Feature"
This feature has been deprecated in NetBox v4.2 and will be removed in a future release. Please consider using the [netbox-branching plugin](https://github.com/netboxlabs/netbox-branching), which provides much more robust functionality.
A branch is a collection of related [staged changes](./stagedchange.md) that have been prepared for merging into the active database. A branch can be merged by executing its `commit()` method. Deleting a branch will delete all its related changes.
## Fields
### Name
The branch's name.
### User
The user to which the branch belongs (optional).

View File

@ -1,29 +0,0 @@
# Staged Changes
!!! danger "Deprecated Feature"
This feature has been deprecated in NetBox v4.2 and will be removed in a future release. Please consider using the [netbox-branching plugin](https://github.com/netboxlabs/netbox-branching), which provides much more robust functionality.
A staged change represents the creation of a new object or the modification or deletion of an existing object to be performed at some future point. Each change must be assigned to a [branch](./branch.md).
Changes can be applied individually via the `apply()` method, however it is recommended to apply changes in bulk using the parent branch's `commit()` method.
## Fields
!!! warning
Staged changes are not typically created or manipulated directly, but rather effected through the use of the [`checkout()`](../../plugins/development/staged-changes.md) context manager.
### Branch
The [branch](./branch.md) to which this change belongs.
### Action
The type of action this change represents: `create`, `update`, or `delete`.
### Object
A generic foreign key referencing the existing object to which this change applies.
### Data
JSON representation of the changes being made to the object (not applicable for deletions).

View File

@ -1,39 +0,0 @@
# Staged Changes
!!! danger "Deprecated Feature"
This feature has been deprecated in NetBox v4.2 and will be removed in a future release. Please consider using the [netbox-branching plugin](https://github.com/netboxlabs/netbox-branching), which provides much more robust functionality.
NetBox provides a programmatic API to stage the creation, modification, and deletion of objects without actually committing those changes to the active database. This can be useful for performing a "dry run" of bulk operations, or preparing a set of changes for administrative approval, for example.
To begin staging changes, first create a [branch](../../models/extras/branch.md):
```python
from extras.models import Branch
branch1 = Branch.objects.create(name='branch1')
```
Then, activate the branch using the `checkout()` context manager and begin making your changes. This initiates a new database transaction.
```python
from extras.models import Branch
from netbox.staging import checkout
branch1 = Branch.objects.get(name='branch1')
with checkout(branch1):
Site.objects.create(name='New Site', slug='new-site')
# ...
```
Upon exiting the context, the database transaction is automatically rolled back and your changes recorded as [staged changes](../../models/extras/stagedchange.md). Re-entering a branch will trigger a new database transaction and automatically apply any staged changes associated with the branch.
To apply the changes within a branch, call the branch's `commit()` method:
```python
from extras.models import Branch
branch1 = Branch.objects.get(name='branch1')
branch1.commit()
```
Committing a branch is an all-or-none operation: Any exceptions will revert the entire set of changes. After successfully committing a branch, all its associated StagedChange objects are automatically deleted (however the branch itself will remain and can be reused).

View File

@ -150,7 +150,6 @@ nav:
- GraphQL API: 'plugins/development/graphql-api.md'
- Background Jobs: 'plugins/development/background-jobs.md'
- Dashboard Widgets: 'plugins/development/dashboard-widgets.md'
- Staged Changes: 'plugins/development/staged-changes.md'
- Exceptions: 'plugins/development/exceptions.md'
- Migrating to v4.0: 'plugins/development/migration-v4.md'
- Administration:
@ -226,7 +225,6 @@ nav:
- VirtualDeviceContext: 'models/dcim/virtualdevicecontext.md'
- Extras:
- Bookmark: 'models/extras/bookmark.md'
- Branch: 'models/extras/branch.md'
- ConfigContext: 'models/extras/configcontext.md'
- ConfigTemplate: 'models/extras/configtemplate.md'
- CustomField: 'models/extras/customfield.md'
@ -239,7 +237,6 @@ nav:
- Notification: 'models/extras/notification.md'
- NotificationGroup: 'models/extras/notificationgroup.md'
- SavedFilter: 'models/extras/savedfilter.md'
- StagedChange: 'models/extras/stagedchange.md'
- Subscription: 'models/extras/subscription.md'
- Tag: 'models/extras/tag.md'
- Webhook: 'models/extras/webhook.md'

View File

@ -26,8 +26,8 @@ class DataSourceSerializer(NetBoxModelSerializer):
model = DataSource
fields = [
'id', 'url', 'display_url', 'display', 'name', 'type', 'source_url', 'enabled', 'status', 'description',
'parameters', 'ignore_rules', 'comments', 'custom_fields', 'created', 'last_updated', 'last_synced',
'file_count',
'sync_interval', 'parameters', 'ignore_rules', 'comments', 'custom_fields', 'created', 'last_updated',
'last_synced', 'file_count',
]
brief_fields = ('id', 'url', 'display', 'name', 'description')

View File

@ -7,13 +7,13 @@ from pathlib import Path
from urllib.parse import urlparse
from django import forms
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.utils.translation import gettext as _
from netbox.data_backends import DataBackend
from netbox.utils import register_data_backend
from utilities.constants import HTTP_PROXY_SUPPORTED_SCHEMAS, HTTP_PROXY_SUPPORTED_SOCK_SCHEMAS
from utilities.proxy import resolve_proxies
from utilities.socks import ProxyPoolManager
from .exceptions import SyncError
@ -70,18 +70,18 @@ class GitBackend(DataBackend):
# Initialize backend config
config = ConfigDict()
self.use_socks = False
self.socks_proxy = None
# Apply HTTP proxy (if configured)
if settings.HTTP_PROXIES:
if proxy := settings.HTTP_PROXIES.get(self.url_scheme, None):
if urlparse(proxy).scheme not in HTTP_PROXY_SUPPORTED_SCHEMAS:
raise ImproperlyConfigured(f"Unsupported Git DataSource proxy scheme: {urlparse(proxy).scheme}")
proxies = resolve_proxies(url=self.url, context={'client': self}) or {}
if proxy := proxies.get(self.url_scheme):
if urlparse(proxy).scheme not in HTTP_PROXY_SUPPORTED_SCHEMAS:
raise ImproperlyConfigured(f"Unsupported Git DataSource proxy scheme: {urlparse(proxy).scheme}")
if self.url_scheme in ('http', 'https'):
config.set("http", "proxy", proxy)
if urlparse(proxy).scheme in HTTP_PROXY_SUPPORTED_SOCK_SCHEMAS:
self.use_socks = True
if self.url_scheme in ('http', 'https'):
config.set("http", "proxy", proxy)
if urlparse(proxy).scheme in HTTP_PROXY_SUPPORTED_SOCK_SCHEMAS:
self.socks_proxy = proxy
return config
@ -98,8 +98,8 @@ class GitBackend(DataBackend):
}
# check if using socks for proxy - if so need to use custom pool_manager
if self.use_socks:
clone_args['pool_manager'] = ProxyPoolManager(settings.HTTP_PROXIES.get(self.url_scheme))
if self.socks_proxy:
clone_args['pool_manager'] = ProxyPoolManager(self.socks_proxy)
if self.url_scheme in ('http', 'https'):
if self.params.get('username'):
@ -147,7 +147,7 @@ class S3Backend(DataBackend):
# Initialize backend config
return Boto3Config(
proxies=settings.HTTP_PROXIES,
proxies=resolve_proxies(url=self.url, context={'client': self}),
)
@contextmanager

View File

@ -29,6 +29,10 @@ class DataSourceFilterSet(NetBoxModelFilterSet):
choices=DataSourceStatusChoices,
null_value=None
)
sync_interval = django_filters.MultipleChoiceFilter(
choices=JobIntervalChoices,
null_value=None
)
class Meta:
model = DataSource

View File

@ -1,6 +1,7 @@
from django import forms
from django.utils.translation import gettext_lazy as _
from core.choices import JobIntervalChoices
from core.models import *
from netbox.forms import NetBoxModelBulkEditForm
from netbox.utils import get_data_backend_choices
@ -29,6 +30,11 @@ class DataSourceBulkEditForm(NetBoxModelBulkEditForm):
max_length=200,
required=False
)
sync_interval = forms.ChoiceField(
choices=JobIntervalChoices,
required=False,
label=_('Sync interval')
)
comments = CommentField()
parameters = forms.JSONField(
label=_('Parameters'),
@ -42,8 +48,8 @@ class DataSourceBulkEditForm(NetBoxModelBulkEditForm):
model = DataSource
fieldsets = (
FieldSet('type', 'enabled', 'description', 'comments', 'parameters', 'ignore_rules'),
FieldSet('type', 'enabled', 'description', 'sync_interval', 'parameters', 'ignore_rules', 'comments'),
)
nullable_fields = (
'description', 'description', 'parameters', 'comments', 'parameters', 'ignore_rules',
'description', 'description', 'sync_interval', 'parameters', 'parameters', 'ignore_rules' 'comments',
)

View File

@ -11,5 +11,6 @@ class DataSourceImportForm(NetBoxModelImportForm):
class Meta:
model = DataSource
fields = (
'name', 'type', 'source_url', 'enabled', 'description', 'comments', 'parameters', 'ignore_rules',
'name', 'type', 'source_url', 'enabled', 'description', 'sync_interval', 'parameters', 'ignore_rules',
'comments',
)

View File

@ -27,7 +27,7 @@ class DataSourceFilterForm(NetBoxModelFilterSetForm):
model = DataSource
fieldsets = (
FieldSet('q', 'filter_id'),
FieldSet('type', 'status', name=_('Data Source')),
FieldSet('type', 'status', 'enabled', 'sync_interval', name=_('Data Source')),
)
type = forms.MultipleChoiceField(
label=_('Type'),
@ -46,6 +46,11 @@ class DataSourceFilterForm(NetBoxModelFilterSetForm):
choices=BOOLEAN_WITH_BLANK_CHOICES
)
)
sync_interval = forms.ChoiceField(
label=_('Sync interval'),
choices=JobIntervalChoices,
required=False
)
class DataFileFilterForm(NetBoxModelFilterSetForm):

View File

@ -36,7 +36,7 @@ class DataSourceForm(NetBoxModelForm):
class Meta:
model = DataSource
fields = [
'name', 'type', 'source_url', 'enabled', 'description', 'comments', 'ignore_rules', 'tags',
'name', 'type', 'source_url', 'enabled', 'description', 'sync_interval', 'ignore_rules', 'comments', 'tags',
]
widgets = {
'ignore_rules': forms.Textarea(
@ -51,7 +51,10 @@ class DataSourceForm(NetBoxModelForm):
@property
def fieldsets(self):
fieldsets = [
FieldSet('name', 'type', 'source_url', 'enabled', 'description', 'tags', 'ignore_rules', name=_('Source')),
FieldSet(
'name', 'type', 'source_url', 'description', 'tags', 'ignore_rules', name=_('Source')
),
FieldSet('enabled', 'sync_interval', name=_('Sync')),
]
if self.backend_fields:
fieldsets.append(

View File

@ -5,6 +5,7 @@ import sys
from django.conf import settings
from netbox.jobs import JobRunner, system_job
from netbox.search.backends import search_backend
from utilities.proxy import resolve_proxies
from .choices import DataSourceStatusChoices, JobIntervalChoices
from .exceptions import SyncError
from .models import DataSource
@ -71,7 +72,7 @@ class SystemHousekeepingJob(JobRunner):
url=settings.CENSUS_URL,
params=census_data,
timeout=3,
proxies=settings.HTTP_PROXIES
proxies=resolve_proxies(url=settings.CENSUS_URL)
)
except requests.exceptions.RequestException:
pass

View File

@ -0,0 +1,18 @@
# Generated by Django 5.1.6 on 2025-02-26 19:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('core', '0012_job_object_type_optional'),
]
operations = [
migrations.AddField(
model_name='datasource',
name='sync_interval',
field=models.PositiveSmallIntegerField(blank=True, null=True),
),
]

View File

@ -59,6 +59,12 @@ class DataSource(JobsMixin, PrimaryModel):
verbose_name=_('enabled'),
default=True
)
sync_interval = models.PositiveSmallIntegerField(
verbose_name=_('sync interval'),
choices=JobIntervalChoices,
blank=True,
null=True
)
ignore_rules = models.TextField(
verbose_name=_('ignore rules'),
blank=True,

View File

@ -11,6 +11,7 @@ from django.core.cache import cache
from netbox.plugins import PluginConfig
from netbox.registry import registry
from utilities.datetime import datetime_from_timestamp
from utilities.proxy import resolve_proxies
USER_AGENT_STRING = f'NetBox/{settings.RELEASE.version} {settings.RELEASE.edition}'
CACHE_KEY_CATALOG_FEED = 'plugins-catalog-feed'
@ -120,10 +121,11 @@ def get_catalog_plugins():
def get_pages():
# TODO: pagination is currently broken in API
payload = {'page': '1', 'per_page': '50'}
proxies = resolve_proxies(url=settings.PLUGIN_CATALOG_URL)
first_page = session.get(
settings.PLUGIN_CATALOG_URL,
headers={'User-Agent': USER_AGENT_STRING},
proxies=settings.HTTP_PROXIES,
proxies=proxies,
timeout=3,
params=payload
).json()
@ -135,7 +137,7 @@ def get_catalog_plugins():
next_page = session.get(
settings.PLUGIN_CATALOG_URL,
headers={'User-Agent': USER_AGENT_STRING},
proxies=settings.HTTP_PROXIES,
proxies=proxies,
timeout=3,
params=payload
).json()

View File

@ -8,16 +8,15 @@ from django.dispatch import receiver, Signal
from django.utils.translation import gettext_lazy as _
from django_prometheus.models import model_deletes, model_inserts, model_updates
from core.choices import ObjectChangeActionChoices
from core.choices import JobStatusChoices, ObjectChangeActionChoices
from core.events import *
from core.models import ObjectChange
from extras.events import enqueue_event
from extras.utils import run_validators
from netbox.config import get_config
from netbox.context import current_request, events_queue
from netbox.models.features import ChangeLoggingMixin
from utilities.exceptions import AbortRequest
from .models import ConfigRevision
from .models import ConfigRevision, DataSource, ObjectChange
__all__ = (
'clear_events',
@ -182,6 +181,25 @@ def clear_events_queue(sender, **kwargs):
# DataSource handlers
#
@receiver(post_save, sender=DataSource)
def enqueue_sync_job(instance, created, **kwargs):
"""
When a DataSource is saved, check its sync_interval and enqueue a sync job if appropriate.
"""
from .jobs import SyncDataSourceJob
if instance.enabled and instance.sync_interval:
SyncDataSourceJob.enqueue_once(instance, interval=instance.sync_interval)
elif not created:
# Delete any previously scheduled recurring jobs for this DataSource
for job in SyncDataSourceJob.get_jobs(instance).defer('data').filter(
interval__isnull=False,
status=JobStatusChoices.STATUS_SCHEDULED
):
# Call delete() per instance to ensure the associated background task is deleted as well
job.delete()
@receiver(post_sync)
def auto_sync(instance, **kwargs):
"""

View File

@ -25,6 +25,9 @@ class DataSourceTable(NetBoxTable):
enabled = columns.BooleanColumn(
verbose_name=_('Enabled'),
)
sync_interval = columns.ChoiceFieldColumn(
verbose_name=_('Sync interval'),
)
tags = columns.TagColumn(
url_name='core:datasource_list'
)
@ -35,10 +38,10 @@ class DataSourceTable(NetBoxTable):
class Meta(NetBoxTable.Meta):
model = DataSource
fields = (
'pk', 'id', 'name', 'type', 'status', 'enabled', 'source_url', 'description', 'comments', 'parameters',
'created', 'last_updated', 'file_count',
'pk', 'id', 'name', 'type', 'status', 'enabled', 'source_url', 'description', 'sync_interval', 'comments',
'parameters', 'created', 'last_updated', 'file_count',
)
default_columns = ('pk', 'name', 'type', 'status', 'enabled', 'description', 'file_count')
default_columns = ('pk', 'name', 'type', 'status', 'enabled', 'description', 'sync_interval', 'file_count')
class DataFileTable(NetBoxTable):

View File

@ -27,7 +27,8 @@ class DataSourceTestCase(TestCase, ChangeLoggedFilterSetTests):
source_url='file:///var/tmp/source1/',
status=DataSourceStatusChoices.NEW,
enabled=True,
description='foobar1'
description='foobar1',
sync_interval=JobIntervalChoices.INTERVAL_HOURLY
),
DataSource(
name='Data Source 2',
@ -35,14 +36,16 @@ class DataSourceTestCase(TestCase, ChangeLoggedFilterSetTests):
source_url='file:///var/tmp/source2/',
status=DataSourceStatusChoices.SYNCING,
enabled=True,
description='foobar2'
description='foobar2',
sync_interval=JobIntervalChoices.INTERVAL_DAILY
),
DataSource(
name='Data Source 3',
type='git',
source_url='https://example.com/git/source3',
status=DataSourceStatusChoices.COMPLETED,
enabled=False
enabled=False,
sync_interval=JobIntervalChoices.INTERVAL_WEEKLY
),
)
DataSource.objects.bulk_create(data_sources)
@ -73,6 +76,10 @@ class DataSourceTestCase(TestCase, ChangeLoggedFilterSetTests):
params = {'status': [DataSourceStatusChoices.NEW, DataSourceStatusChoices.SYNCING]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
def test_sync_interval(self):
params = {'sync_interval': [JobIntervalChoices.INTERVAL_HOURLY, JobIntervalChoices.INTERVAL_DAILY]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class DataFileTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = DataFile.objects.all()

View File

@ -1,3 +1,4 @@
from django.utils.translation import gettext as _
from django.contrib.contenttypes.models import ContentType
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
@ -155,10 +156,10 @@ class PowerOutletSerializer(NetBoxModelSerializer, CabledObjectSerializer, Conne
class Meta:
model = PowerOutlet
fields = [
'id', 'url', 'display_url', 'display', 'device', 'module', 'name', 'label', 'type', 'color', 'power_port',
'feed_leg', 'description', 'mark_connected', 'cable', 'cable_end', 'link_peers', 'link_peers_type',
'connected_endpoints', 'connected_endpoints_type', 'connected_endpoints_reachable', 'tags', 'custom_fields',
'created', 'last_updated', '_occupied',
'id', 'url', 'display_url', 'display', 'device', 'module', 'name', 'label', 'type', 'status', 'color',
'power_port', 'feed_leg', 'description', 'mark_connected', 'cable', 'cable_end', 'link_peers',
'link_peers_type', 'connected_endpoints', 'connected_endpoints_type', 'connected_endpoints_reachable',
'tags', 'custom_fields', 'created', 'last_updated', '_occupied',
]
brief_fields = ('id', 'url', 'display', 'device', 'name', 'description', 'cable', '_occupied')
@ -232,8 +233,56 @@ class InterfaceSerializer(NetBoxModelSerializer, CabledObjectSerializer, Connect
def validate(self, data):
# Validate many-to-many VLAN assignments
if not self.nested:
# Validate 802.1q mode and vlan(s)
mode = None
tagged_vlans = []
# Gather Information
if self.instance:
mode = data.get('mode') if 'mode' in data.keys() else self.instance.mode
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else \
self.instance.untagged_vlan
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else \
self.instance.qinq_svlan
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else \
self.instance.tagged_vlans.all()
else:
mode = data.get('mode', None)
untagged_vlan = data.get('untagged_vlan') if 'untagged_vlan' in data.keys() else None
qinq_svlan = data.get('qinq_svlan') if 'qinq_svlan' in data.keys() else None
tagged_vlans = data.get('tagged_vlans') if 'tagged_vlans' in data.keys() else None
errors = {}
# Non Q-in-Q mode with service vlan set
if mode != InterfaceModeChoices.MODE_Q_IN_Q and qinq_svlan:
errors.update({
'qinq_svlan': _("Interface mode does not support q-in-q service vlan")
})
# Routed mode
if not mode:
# Untagged vlan
if untagged_vlan:
errors.update({
'untagged_vlan': _("Interface mode does not support untagged vlan")
})
# Tagged vlan
if tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})
# Non-tagged mode
elif mode in (InterfaceModeChoices.MODE_TAGGED_ALL, InterfaceModeChoices.MODE_ACCESS) and tagged_vlans:
errors.update({
'tagged_vlans': _("Interface mode does not support tagged vlans")
})
if errors:
raise serializers.ValidationError(errors)
# Validate many-to-many VLAN assignments
device = self.instance.device if self.instance else data.get('device')
for vlan in data.get('tagged_vlans', []):
if vlan.site not in [device.site, None]:

View File

@ -1627,6 +1627,23 @@ class PowerFeedPhaseChoices(ChoiceSet):
)
#
# PowerOutlets
#
class PowerOutletStatusChoices(ChoiceSet):
key = 'PowerOutlet.status'
STATUS_ENABLED = 'enabled'
STATUS_DISABLED = 'disabled'
STATUS_FAULTY = 'faulty'
CHOICES = [
(STATUS_ENABLED, _('Enabled'), 'green'),
(STATUS_DISABLED, _('Disabled'), 'red'),
(STATUS_FAULTY, _('Faulty'), 'gray'),
]
#
# VDC
#

View File

@ -1591,11 +1591,15 @@ class PowerOutletFilterSet(
queryset=PowerPort.objects.all(),
label=_('Power port (ID)'),
)
status = django_filters.MultipleChoiceFilter(
choices=PowerOutletStatusChoices,
null_value=None
)
class Meta:
model = PowerOutlet
fields = (
'id', 'name', 'label', 'feed_leg', 'description', 'color', 'mark_connected', 'cable_end',
'id', 'name', 'status', 'label', 'feed_leg', 'description', 'color', 'mark_connected', 'cable_end',
)

View File

@ -1379,7 +1379,10 @@ class PowerPortBulkEditForm(
class PowerOutletBulkEditForm(
ComponentBulkEditForm,
form_from_model(PowerOutlet, ['label', 'type', 'color', 'feed_leg', 'power_port', 'mark_connected', 'description'])
form_from_model(
PowerOutlet,
['label', 'type', 'status', 'color', 'feed_leg', 'power_port', 'mark_connected', 'description']
)
):
mark_connected = forms.NullBooleanField(
label=_('Mark connected'),
@ -1389,7 +1392,7 @@ class PowerOutletBulkEditForm(
model = PowerOutlet
fieldsets = (
FieldSet('module', 'type', 'label', 'description', 'mark_connected', 'color'),
FieldSet('module', 'type', 'label', 'status', 'description', 'mark_connected', 'color'),
FieldSet('feed_leg', 'power_port', name=_('Power')),
)
nullable_fields = ('module', 'label', 'type', 'feed_leg', 'power_port', 'description')

View File

@ -43,20 +43,14 @@ class InterfaceCommonForm(forms.Form):
super().clean()
parent_field = 'device' if 'device' in self.cleaned_data else 'virtual_machine'
tagged_vlans = self.cleaned_data.get('tagged_vlans')
# Untagged interfaces cannot be assigned tagged VLANs
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_ACCESS and tagged_vlans:
raise forms.ValidationError({
'mode': _("An access interface cannot have tagged VLANs assigned.")
})
# Remove all tagged VLAN assignments from "tagged all" interfaces
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED_ALL:
self.cleaned_data['tagged_vlans'] = []
if 'tagged_vlans' in self.fields.keys():
tagged_vlans = self.cleaned_data.get('tagged_vlans') if self.is_bound else \
self.get_initial_for_field(self.fields['tagged_vlans'], 'tagged_vlans')
else:
tagged_vlans = []
# Validate tagged VLANs; must be a global VLAN or in the same site
elif self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
if self.cleaned_data['mode'] == InterfaceModeChoices.MODE_TAGGED and tagged_vlans:
valid_sites = [None, self.cleaned_data[parent_field].site]
invalid_vlans = [str(v) for v in tagged_vlans if v.site not in valid_sites]

View File

@ -1305,7 +1305,7 @@ class PowerOutletFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):
model = PowerOutlet
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
FieldSet('name', 'label', 'type', 'color', name=_('Attributes')),
FieldSet('name', 'label', 'type', 'color', 'status', name=_('Attributes')),
FieldSet('region_id', 'site_group_id', 'site_id', 'location_id', 'rack_id', name=_('Location')),
FieldSet(
'device_type_id', 'device_role_id', 'device_id', 'device_status', 'virtual_chassis_id',
@ -1323,6 +1323,11 @@ class PowerOutletFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):
label=_('Color'),
required=False
)
status = forms.MultipleChoiceField(
label=_('Status'),
choices=PowerOutletStatusChoices,
required=False
)
class InterfaceFilterForm(PathEndpointFilterForm, DeviceComponentFilterForm):

View File

@ -1308,7 +1308,7 @@ class PowerOutletForm(ModularDeviceComponentForm):
fieldsets = (
FieldSet(
'device', 'module', 'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'mark_connected',
'device', 'module', 'name', 'label', 'type', 'status', 'color', 'power_port', 'feed_leg', 'mark_connected',
'description', 'tags',
),
)
@ -1316,7 +1316,7 @@ class PowerOutletForm(ModularDeviceComponentForm):
class Meta:
model = PowerOutlet
fields = [
'device', 'module', 'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'mark_connected',
'device', 'module', 'name', 'label', 'type', 'status', 'color', 'power_port', 'feed_leg', 'mark_connected',
'description', 'tags',
]

View File

@ -0,0 +1,16 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('dcim', '0200_populate_mac_addresses'),
]
operations = [
migrations.AddField(
model_name='poweroutlet',
name='status',
field=models.CharField(default='enabled', max_length=50),
),
]

View File

@ -449,6 +449,12 @@ class PowerOutlet(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracki
"""
A physical power outlet (output) within a Device which provides power to a PowerPort.
"""
status = models.CharField(
verbose_name=_('status'),
max_length=50,
choices=PowerOutletStatusChoices,
default=PowerOutletStatusChoices.STATUS_ENABLED
)
type = models.CharField(
verbose_name=_('type'),
max_length=50,
@ -492,6 +498,9 @@ class PowerOutlet(ModularComponentModel, CabledObjectModel, PathEndpoint, Tracki
_("Parent power port ({power_port}) must belong to the same device").format(power_port=self.power_port)
)
def get_status_color(self):
return PowerOutletStatusChoices.colors.get(self.status)
#
# Interfaces
@ -934,6 +943,8 @@ class Interface(ModularComponentModel, BaseInterface, CabledObjectModel, PathEnd
raise ValidationError({'rf_channel_width': _("Cannot specify custom width with channel selected.")})
# VLAN validation
if not self.mode and self.untagged_vlan:
raise ValidationError({'untagged_vlan': _("Interface mode does not support an untagged vlan.")})
# Validate untagged VLAN
if self.untagged_vlan and self.untagged_vlan.site not in [self.device.site, None]:

View File

@ -224,7 +224,7 @@ class PowerOutletIndex(SearchIndex):
('label', 200),
('description', 500),
)
display_attrs = ('device', 'label', 'type', 'description')
display_attrs = ('device', 'label', 'type', 'status', 'description')
@register_search

View File

@ -520,6 +520,9 @@ class PowerOutletTable(ModularDeviceComponentTable, PathEndpointTable):
verbose_name=_('Power Port'),
linkify=True
)
status = columns.ChoiceFieldColumn(
verbose_name=_('Status'),
)
color = columns.ColorColumn()
tags = columns.TagColumn(
url_name='dcim:poweroutlet_list'
@ -530,9 +533,11 @@ class PowerOutletTable(ModularDeviceComponentTable, PathEndpointTable):
fields = (
'pk', 'id', 'name', 'device', 'module_bay', 'module', 'label', 'type', 'description', 'power_port',
'color', 'feed_leg', 'mark_connected', 'cable', 'cable_color', 'link_peer', 'connection', 'inventory_items',
'tags', 'created', 'last_updated',
'tags', 'created', 'last_updated', 'status',
)
default_columns = (
'pk', 'name', 'device', 'label', 'type', 'status', 'color', 'power_port', 'feed_leg', 'description',
)
default_columns = ('pk', 'name', 'device', 'label', 'type', 'color', 'power_port', 'feed_leg', 'description')
class DevicePowerOutletTable(PowerOutletTable):
@ -550,9 +555,11 @@ class DevicePowerOutletTable(PowerOutletTable):
fields = (
'pk', 'id', 'name', 'module_bay', 'module', 'label', 'type', 'color', 'power_port', 'feed_leg',
'description', 'mark_connected', 'cable', 'cable_color', 'link_peer', 'connection', 'tags', 'actions',
'status',
)
default_columns = (
'pk', 'name', 'label', 'type', 'color', 'power_port', 'feed_leg', 'description', 'cable', 'connection',
'pk', 'name', 'label', 'type', 'status', 'color', 'power_port', 'feed_leg', 'description', 'cable',
'connection',
)

View File

@ -1,3 +1,5 @@
import json
from django.test import override_settings
from django.urls import reverse
from django.utils.translation import gettext as _
@ -1748,6 +1750,23 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
},
]
def _perform_interface_test_with_invalid_data(self, mode: str = None, invalid_data: dict = {}):
device = Device.objects.first()
data = {
'device': device.pk,
'name': 'Interface 1',
'type': InterfaceTypeChoices.TYPE_1GE_FIXED,
}
data.update({'mode': mode})
data.update(invalid_data)
response = self.client.post(self._get_list_url(), data, format='json', **self.header)
self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
content = json.loads(response.content)
for key in invalid_data.keys():
self.assertIn(key, content)
self.assertIsNone(content.get('data'))
def test_bulk_delete_child_interfaces(self):
interface1 = Interface.objects.get(name='Interface 1')
device = interface1.device
@ -1775,6 +1794,57 @@ class InterfaceTest(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTestCase
self.client.delete(self._get_list_url(), data, format='json', **self.header)
self.assertEqual(device.interfaces.count(), 2) # Child & parent were both deleted
def test_create_child_interfaces_mode_invalid_data(self):
"""
POST data to test interface mode check and invalid tagged/untagged VLANS.
"""
self.add_permissions('dcim.add_interface')
vlans = VLAN.objects.all()[0:3]
# Routed mode, untagged, tagged and qinq service vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
'qinq_svlan': vlans[2].pk
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Routed mode, untagged and tagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Routed mode, untagged vlan
invalid_data = {
'untagged_vlan': vlans[0].pk,
}
self._perform_interface_test_with_invalid_data(None, invalid_data)
invalid_data = {
'tagged_vlans': [vlans[1].pk, vlans[2].pk],
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# All tagged mode, tagged vlans
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)
invalid_data = {
'qinq_svlan': vlans[0].pk,
}
# Routed mode, qinq service vlan
self._perform_interface_test_with_invalid_data(None, invalid_data)
# Access mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_ACCESS, invalid_data)
# Tagged mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED, invalid_data)
# Tagged-all mode, qinq service vlan
self._perform_interface_test_with_invalid_data(InterfaceModeChoices.MODE_TAGGED_ALL, invalid_data)
class FrontPortTest(APIViewTestCases.APIViewTestCase):
model = FrontPort

View File

@ -3684,6 +3684,7 @@ class PowerOutletTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedF
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
description='First',
color='ff0000',
status=PowerOutletStatusChoices.STATUS_ENABLED,
),
PowerOutlet(
device=devices[1],
@ -3693,6 +3694,7 @@ class PowerOutletTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedF
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_B,
description='Second',
color='00ff00',
status=PowerOutletStatusChoices.STATUS_DISABLED,
),
PowerOutlet(
device=devices[2],
@ -3702,6 +3704,7 @@ class PowerOutletTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedF
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_C,
description='Third',
color='0000ff',
status=PowerOutletStatusChoices.STATUS_FAULTY,
),
)
PowerOutlet.objects.bulk_create(power_outlets)
@ -3796,6 +3799,23 @@ class PowerOutletTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedF
params = {'connected': False}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_status(self):
params = {'status': [PowerOutletStatusChoices.STATUS_ENABLED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'status': [PowerOutletStatusChoices.STATUS_DISABLED]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'status': [PowerOutletStatusChoices.STATUS_FAULTY]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
params = {'status': [
PowerOutletStatusChoices.STATUS_ENABLED,
PowerOutletStatusChoices.STATUS_DISABLED,
PowerOutletStatusChoices.STATUS_FAULTY,
]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
class InterfaceTestCase(TestCase, DeviceComponentFilterSetTests, ChangeLoggedFilterSetTests):
queryset = Interface.objects.all()

View File

@ -1,8 +1,11 @@
from django.test import TestCase
from dcim.choices import DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices
from dcim.choices import (
DeviceFaceChoices, DeviceStatusChoices, InterfaceTypeChoices, InterfaceModeChoices, PowerOutletStatusChoices
)
from dcim.forms import *
from dcim.models import *
from ipam.models import VLAN
from utilities.testing import create_test_device
from virtualization.models import Cluster, ClusterGroup, ClusterType
@ -11,6 +14,56 @@ def get_id(model, slug):
return model.objects.get(slug=slug).id
class PowerOutletFormTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.site = site = Site.objects.create(name='Site 1', slug='site-1')
cls.manufacturer = manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
cls.role = role = DeviceRole.objects.create(
name='Device Role 1', slug='device-role-1', color='ff0000'
)
cls.device_type = device_type = DeviceType.objects.create(
manufacturer=manufacturer, model='Device Type 1', slug='device-type-1', u_height=1
)
cls.rack = rack = Rack.objects.create(name='Rack 1', site=site)
cls.device = Device.objects.create(
name='Device 1', device_type=device_type, role=role, site=site, rack=rack, position=1
)
def test_status_is_required(self):
form = PowerOutletForm(data={
'device': self.device,
'module': None,
'name': 'New Enabled Outlet',
})
self.assertFalse(form.is_valid())
self.assertIn('status', form.errors)
def test_status_must_be_defined_choice(self):
form = PowerOutletForm(data={
'device': self.device,
'module': None,
'name': 'New Enabled Outlet',
'status': 'this isn\'t a defined choice',
})
self.assertFalse(form.is_valid())
self.assertIn('status', form.errors)
self.assertTrue(form.errors['status'][-1].startswith('Select a valid choice.'))
def test_status_recognizes_choices(self):
for index, choice in enumerate(PowerOutletStatusChoices.CHOICES):
form = PowerOutletForm(data={
'device': self.device,
'module': None,
'name': f'New Enabled Outlet {index + 1}',
'status': choice[0],
})
self.assertEqual({}, form.errors)
self.assertTrue(form.is_valid())
instance = form.save()
self.assertEqual(instance.status, choice[0])
class DeviceTestCase(TestCase):
@classmethod
@ -117,11 +170,23 @@ class DeviceTestCase(TestCase):
self.assertIn('position', form.errors)
class LabelTestCase(TestCase):
class InterfaceTestCase(TestCase):
@classmethod
def setUpTestData(cls):
cls.device = create_test_device('Device 1')
cls.vlans = (
VLAN(name='VLAN 1', vid=1),
VLAN(name='VLAN 2', vid=2),
VLAN(name='VLAN 3', vid=3),
)
VLAN.objects.bulk_create(cls.vlans)
cls.interface = Interface.objects.create(
device=cls.device,
name='Interface 1',
type=InterfaceTypeChoices.TYPE_1GE_GBIC,
mode=InterfaceModeChoices.MODE_TAGGED,
)
def test_interface_label_count_valid(self):
"""
@ -151,3 +216,152 @@ class LabelTestCase(TestCase):
self.assertFalse(form.is_valid())
self.assertIn('label', form.errors)
def test_create_interface_mode_valid_data(self):
"""
Test that saving valid interface mode and tagged/untagged vlans works properly
"""
# Validate access mode
data = {
'device': self.device.pk,
'name': 'ethernet1/1',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/2',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
# Validate tagged vlans
data = {
'device': self.device.pk,
'name': 'ethernet1/3',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'untagged_vlan': self.vlans[0].pk,
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
def test_create_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/4',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_access_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/5',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_ACCESS,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data, instance=self.interface)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_create_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_tagged_all_invalid_data(self):
"""
Test that saving invalid interface mode and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': InterfaceModeChoices.MODE_TAGGED_ALL,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_create_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'ethernet1/6',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceCreateForm(data)
self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())
def test_edit_interface_mode_routed_invalid_data(self):
"""
Test that saving invalid interface mode (routed) and tagged/untagged vlans works properly
"""
data = {
'device': self.device.pk,
'name': 'Ethernet 1/7',
'type': InterfaceTypeChoices.TYPE_1GE_GBIC,
'mode': None,
'untagged_vlan': self.vlans[0].pk,
'tagged_vlans': [self.vlans[0].pk, self.vlans[1].pk, self.vlans[2].pk]
}
form = InterfaceForm(data)
self.assertTrue(form.is_valid())
self.assertNotIn('untagged_vlan', form.cleaned_data.keys())
self.assertNotIn('tagged_vlans', form.cleaned_data.keys())
self.assertNotIn('qinq_svlan', form.cleaned_data.keys())

View File

@ -465,7 +465,8 @@ class DeviceTestCase(TestCase):
device=device,
name='Power Outlet 1',
power_port=powerport,
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A
feed_leg=PowerOutletFeedLegChoices.FEED_LEG_A,
status=PowerOutletStatusChoices.STATUS_ENABLED,
)
self.assertEqual(poweroutlet.cf['cf1'], 'foo')

View File

@ -2513,6 +2513,7 @@ class PowerOutletTestCase(ViewTestCases.DeviceComponentViewTestCase):
'device': device.pk,
'name': 'Power Outlet X',
'type': PowerOutletTypeChoices.TYPE_IEC_C13,
'status': PowerOutletStatusChoices.STATUS_ENABLED,
'power_port': powerports[1].pk,
'feed_leg': PowerOutletFeedLegChoices.FEED_LEG_B,
'description': 'A power outlet',
@ -2523,6 +2524,7 @@ class PowerOutletTestCase(ViewTestCases.DeviceComponentViewTestCase):
'device': device.pk,
'name': 'Power Outlet [4-6]',
'type': PowerOutletTypeChoices.TYPE_IEC_C13,
'status': PowerOutletStatusChoices.STATUS_ENABLED,
'power_port': powerports[1].pk,
'feed_leg': PowerOutletFeedLegChoices.FEED_LEG_B,
'description': 'A power outlet',
@ -2531,6 +2533,7 @@ class PowerOutletTestCase(ViewTestCases.DeviceComponentViewTestCase):
cls.bulk_edit_data = {
'type': PowerOutletTypeChoices.TYPE_IEC_C15,
'status': PowerOutletStatusChoices.STATUS_ENABLED,
'power_port': powerports[1].pk,
'feed_leg': PowerOutletFeedLegChoices.FEED_LEG_B,
'description': 'New description',

View File

@ -1,10 +1,16 @@
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from core.models import ObjectType
from extras.models import Tag
from extras.models import Tag, TaggedItem
from netbox.api.exceptions import SerializerNotFound
from netbox.api.fields import ContentTypeField, RelatedObjectCountField
from netbox.api.serializers import ValidatedModelSerializer
from netbox.api.serializers import BaseModelSerializer, ValidatedModelSerializer
from utilities.api import get_serializer_for_model
__all__ = (
'TagSerializer',
'TaggedItemSerializer',
)
@ -25,3 +31,37 @@ class TagSerializer(ValidatedModelSerializer):
'tagged_items', 'created', 'last_updated',
]
brief_fields = ('id', 'url', 'display', 'name', 'slug', 'color', 'description')
class TaggedItemSerializer(BaseModelSerializer):
object_type = ContentTypeField(
source='content_type',
read_only=True
)
object = serializers.SerializerMethodField(
read_only=True
)
tag = TagSerializer(
nested=True,
read_only=True
)
class Meta:
model = TaggedItem
fields = [
'id', 'url', 'display', 'object_type', 'object_id', 'object', 'tag',
]
brief_fields = ('id', 'url', 'display', 'object_type', 'object_id', 'object', 'tag')
@extend_schema_field(serializers.JSONField())
def get_object(self, obj):
"""
Serialize a nested representation of the tagged object.
"""
try:
serializer = get_serializer_for_model(obj.content_object)
except SerializerNotFound:
return obj.object_repr
data = serializer(obj.content_object, nested=True, context={'request': self.context['request']}).data
return data

View File

@ -19,6 +19,7 @@ router.register('notifications', views.NotificationViewSet)
router.register('notification-groups', views.NotificationGroupViewSet)
router.register('subscriptions', views.SubscriptionViewSet)
router.register('tags', views.TagViewSet)
router.register('tagged-objects', views.TaggedItemViewSet)
router.register('image-attachments', views.ImageAttachmentViewSet)
router.register('journal-entries', views.JournalEntryViewSet)
router.register('config-contexts', views.ConfigContextViewSet)

View File

@ -6,6 +6,7 @@ from rest_framework import status
from rest_framework.decorators import action
from rest_framework.exceptions import PermissionDenied
from rest_framework.generics import RetrieveUpdateDestroyAPIView
from rest_framework.mixins import ListModelMixin, RetrieveModelMixin
from rest_framework.renderers import JSONRenderer
from rest_framework.response import Response
from rest_framework.routers import APIRootView
@ -20,7 +21,7 @@ from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.api.features import SyncedDataMixin
from netbox.api.metadata import ContentTypeMetadata
from netbox.api.renderers import TextRenderer
from netbox.api.viewsets import NetBoxModelViewSet
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
from utilities.exceptions import RQWorkerNotRunningException
from utilities.request import copy_safe_request
from . import serializers
@ -172,6 +173,12 @@ class TagViewSet(NetBoxModelViewSet):
filterset_class = filtersets.TagFilterSet
class TaggedItemViewSet(RetrieveModelMixin, ListModelMixin, BaseViewSet):
queryset = TaggedItem.objects.prefetch_related('content_type', 'content_object', 'tag')
serializer_class = serializers.TaggedItemSerializer
filterset_class = filtersets.TaggedItemFilterSet
#
# Image attachments
#

View File

@ -212,23 +212,6 @@ class WebhookHttpMethodChoices(ChoiceSet):
)
#
# Staging
#
class ChangeActionChoices(ChoiceSet):
ACTION_CREATE = 'create'
ACTION_UPDATE = 'update'
ACTION_DELETE = 'delete'
CHOICES = (
(ACTION_CREATE, _('Create'), 'green'),
(ACTION_UPDATE, _('Update'), 'blue'),
(ACTION_DELETE, _('Delete'), 'red'),
)
#
# Dashboard widgets
#

View File

@ -17,6 +17,7 @@ from core.models import ObjectType
from extras.choices import BookmarkOrderingChoices
from utilities.object_types import object_type_identifier, object_type_name
from utilities.permissions import get_permission_for_model
from utilities.proxy import resolve_proxies
from utilities.querydict import dict_to_querydict
from utilities.templatetags.builtins.filters import render_markdown
from utilities.views import get_viewname
@ -330,7 +331,7 @@ class RSSFeedWidget(DashboardWidget):
response = requests.get(
url=self.config['feed_url'],
headers={'User-Agent': f'NetBox/{settings.RELEASE.version}'},
proxies=settings.HTTP_PROXIES,
proxies=resolve_proxies(url=self.config['feed_url'], context={'client': self}),
timeout=3
)
response.raise_for_status()

View File

@ -31,6 +31,7 @@ __all__ = (
'SavedFilterFilterSet',
'ScriptFilterSet',
'TagFilterSet',
'TaggedItemFilterSet',
'WebhookFilterSet',
)
@ -492,6 +493,41 @@ class TagFilterSet(ChangeLoggedModelFilterSet):
)
class TaggedItemFilterSet(BaseFilterSet):
q = django_filters.CharFilter(
method='search',
label=_('Search'),
)
object_type = ContentTypeFilter(
field_name='content_type'
)
object_type_id = django_filters.ModelMultipleChoiceFilter(
queryset=ContentType.objects.all(),
field_name='content_type_id'
)
tag_id = django_filters.ModelMultipleChoiceFilter(
queryset=Tag.objects.all()
)
tag = django_filters.ModelMultipleChoiceFilter(
field_name='tag__slug',
queryset=Tag.objects.all(),
to_field_name='slug',
)
class Meta:
model = TaggedItem
fields = ('id', 'object_id')
def search(self, queryset, name, value):
if not value.strip():
return queryset
return queryset.filter(
Q(tag__name__icontains=value) |
Q(tag__slug__icontains=value) |
Q(tag__description__icontains=value)
)
class ConfigContextFilterSet(ChangeLoggedModelFilterSet):
q = django_filters.CharFilter(
method='search',

View File

@ -11,6 +11,7 @@ from packaging import version
from core.models import Job, ObjectChange
from netbox.config import Config
from utilities.proxy import resolve_proxies
class Command(BaseCommand):
@ -107,7 +108,7 @@ class Command(BaseCommand):
response = requests.get(
url=settings.RELEASE_CHECK_URL,
headers=headers,
proxies=settings.HTTP_PROXIES
proxies=resolve_proxies(url=settings.RELEASE_CHECK_URL)
)
response.raise_for_status()

View File

@ -0,0 +1,27 @@
# Generated by Django 5.1.5 on 2025-02-20 19:46
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('extras', '0122_charfield_null_choices'),
]
operations = [
migrations.RemoveField(
model_name='stagedchange',
name='branch',
),
migrations.RemoveField(
model_name='stagedchange',
name='object_type',
),
migrations.DeleteModel(
name='Branch',
),
migrations.DeleteModel(
name='StagedChange',
),
]

View File

@ -5,5 +5,4 @@ from .models import *
from .notifications import *
from .scripts import *
from .search import *
from .staging import *
from .tags import *

View File

@ -1,150 +0,0 @@
import logging
import warnings
from django.contrib.contenttypes.fields import GenericForeignKey
from django.db import models, transaction
from django.utils.translation import gettext_lazy as _
from mptt.models import MPTTModel
from extras.choices import ChangeActionChoices
from netbox.models import ChangeLoggedModel
from netbox.models.features import *
from utilities.serialization import deserialize_object
__all__ = (
'Branch',
'StagedChange',
)
logger = logging.getLogger('netbox.staging')
class Branch(ChangeLoggedModel):
"""
A collection of related StagedChanges.
"""
name = models.CharField(
verbose_name=_('name'),
max_length=100,
unique=True
)
description = models.CharField(
verbose_name=_('description'),
max_length=200,
blank=True
)
user = models.ForeignKey(
to='users.User',
on_delete=models.SET_NULL,
blank=True,
null=True
)
class Meta:
ordering = ('name',)
verbose_name = _('branch')
verbose_name_plural = _('branches')
def __init__(self, *args, **kwargs):
warnings.warn(
'The staged changes functionality has been deprecated and will be removed in a future release.',
DeprecationWarning
)
super().__init__(*args, **kwargs)
def __str__(self):
return f'{self.name} ({self.pk})'
def merge(self):
logger.info(f'Merging changes in branch {self}')
with transaction.atomic():
for change in self.staged_changes.all():
change.apply()
self.staged_changes.all().delete()
class StagedChange(CustomValidationMixin, EventRulesMixin, models.Model):
"""
The prepared creation, modification, or deletion of an object to be applied to the active database at a
future point.
"""
branch = models.ForeignKey(
to=Branch,
on_delete=models.CASCADE,
related_name='staged_changes'
)
action = models.CharField(
verbose_name=_('action'),
max_length=20,
choices=ChangeActionChoices
)
object_type = models.ForeignKey(
to='contenttypes.ContentType',
on_delete=models.CASCADE,
related_name='+'
)
object_id = models.PositiveBigIntegerField(
blank=True,
null=True
)
object = GenericForeignKey(
ct_field='object_type',
fk_field='object_id'
)
data = models.JSONField(
verbose_name=_('data'),
blank=True,
null=True
)
class Meta:
ordering = ('pk',)
indexes = (
models.Index(fields=('object_type', 'object_id')),
)
verbose_name = _('staged change')
verbose_name_plural = _('staged changes')
def __init__(self, *args, **kwargs):
warnings.warn(
'The staged changes functionality has been deprecated and will be removed in a future release.',
DeprecationWarning
)
super().__init__(*args, **kwargs)
def __str__(self):
action = self.get_action_display()
app_label, model_name = self.object_type.natural_key()
return f"{action} {app_label}.{model_name} ({self.object_id})"
@property
def model(self):
return self.object_type.model_class()
def apply(self):
"""
Apply the staged create/update/delete action to the database.
"""
if self.action == ChangeActionChoices.ACTION_CREATE:
instance = deserialize_object(self.model, self.data, pk=self.object_id)
logger.info(f'Creating {self.model._meta.verbose_name} {instance}')
instance.save()
if self.action == ChangeActionChoices.ACTION_UPDATE:
instance = deserialize_object(self.model, self.data, pk=self.object_id)
logger.info(f'Updating {self.model._meta.verbose_name} {instance}')
instance.save()
if self.action == ChangeActionChoices.ACTION_DELETE:
instance = self.model.objects.get(pk=self.object_id)
logger.info(f'Deleting {self.model._meta.verbose_name} {instance}')
instance.delete()
# Rebuild the MPTT tree where applicable
if issubclass(self.model, MPTTModel):
self.model.objects.rebuild()
apply.alters_data = True
def get_action_color(self):
return ChangeActionChoices.colors.get(self.action)

View File

@ -9,6 +9,7 @@ from netbox.choices import ColorChoices
from netbox.models import ChangeLoggedModel
from netbox.models.features import CloningMixin, ExportTemplatesMixin
from utilities.fields import ColorField
from utilities.querysets import RestrictedQuerySet
__all__ = (
'Tag',
@ -72,6 +73,7 @@ class TaggedItem(GenericTaggedItemBase):
)
_netbox_private = True
objects = RestrictedQuerySet.as_manager()
class Meta:
indexes = [models.Index(fields=["content_type", "object_id"])]

View File

@ -538,6 +538,34 @@ class TagTest(APIViewTestCases.APIViewTestCase):
Tag.objects.bulk_create(tags)
class TaggedItemTest(
APIViewTestCases.GetObjectViewTestCase,
APIViewTestCases.ListObjectsViewTestCase
):
model = TaggedItem
brief_fields = ['display', 'id', 'object', 'object_id', 'object_type', 'tag', 'url']
@classmethod
def setUpTestData(cls):
tags = (
Tag(name='Tag 1', slug='tag-1'),
Tag(name='Tag 2', slug='tag-2'),
Tag(name='Tag 3', slug='tag-3'),
)
Tag.objects.bulk_create(tags)
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
sites[0].tags.set([tags[0], tags[1]])
sites[1].tags.set([tags[1], tags[2]])
sites[2].tags.set([tags[2], tags[0]])
# TODO: Standardize to APIViewTestCase (needs create & update tests)
class ImageAttachmentTest(
APIViewTestCases.GetObjectViewTestCase,

View File

@ -1250,6 +1250,62 @@ class TagTestCase(TestCase, ChangeLoggedFilterSetTests):
)
class TaggedItemFilterSetTestCase(TestCase):
queryset = TaggedItem.objects.all()
filterset = TaggedItemFilterSet
@classmethod
def setUpTestData(cls):
tags = (
Tag(name='Tag 1', slug='tag-1'),
Tag(name='Tag 2', slug='tag-2'),
Tag(name='Tag 3', slug='tag-3'),
)
Tag.objects.bulk_create(tags)
sites = (
Site(name='Site 1', slug='site-1'),
Site(name='Site 2', slug='site-2'),
Site(name='Site 3', slug='site-3'),
)
Site.objects.bulk_create(sites)
sites[0].tags.add(tags[0])
sites[1].tags.add(tags[1])
sites[2].tags.add(tags[2])
tenants = (
Tenant(name='Tenant 1', slug='tenant-1'),
Tenant(name='Tenant 2', slug='tenant-2'),
Tenant(name='Tenant 3', slug='tenant-3'),
)
Tenant.objects.bulk_create(tenants)
tenants[0].tags.add(tags[0])
tenants[1].tags.add(tags[1])
tenants[2].tags.add(tags[2])
def test_tag(self):
tags = Tag.objects.all()[:2]
params = {'tag': [tags[0].slug, tags[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
params = {'tag_id': [tags[0].pk, tags[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 4)
def test_object_type(self):
object_type = ObjectType.objects.get_for_model(Site)
params = {'object_type': 'dcim.site'}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'object_type_id': [object_type.pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_object_id(self):
site_ids = Site.objects.values_list('pk', flat=True)
params = {
'object_type': 'dcim.site',
'object_id': site_ids[:2],
}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 2)
class ChangeLoggedFilterSetTestCase(TestCase):
"""
Evaluate base ChangeLoggedFilterSet filters using the Site model.

View File

@ -3,10 +3,10 @@ import hmac
import logging
import requests
from django.conf import settings
from django_rq import job
from jinja2.exceptions import TemplateError
from utilities.proxy import resolve_proxies
from .constants import WEBHOOK_EVENT_TYPES
logger = logging.getLogger('netbox.webhooks')
@ -63,9 +63,10 @@ def send_webhook(event_rule, model_name, event_type, data, timestamp, username,
raise e
# Prepare the HTTP request
url = webhook.render_payload_url(context)
params = {
'method': webhook.http_method,
'url': webhook.render_payload_url(context),
'url': url,
'headers': headers,
'data': body.encode('utf8'),
}
@ -88,7 +89,8 @@ def send_webhook(event_rule, model_name, event_type, data, timestamp, username,
session.verify = webhook.ssl_verification
if webhook.ca_file_path:
session.verify = webhook.ca_file_path
response = session.send(prepared_request, proxies=settings.HTTP_PROXIES)
proxies = resolve_proxies(url=url, context={'client': webhook})
response = session.send(prepared_request, proxies=proxies)
if 200 <= response.status_code <= 299:
logger.info(f"Request succeeded; response status {response.status_code}")

View File

@ -37,6 +37,7 @@ class VLANGroupSerializer(NetBoxModelSerializer):
scope = serializers.SerializerMethodField(read_only=True)
vid_ranges = IntegerRangeSerializer(many=True, required=False)
utilization = serializers.CharField(read_only=True)
tenant = TenantSerializer(nested=True, required=False, allow_null=True)
# Related object counts
vlan_count = RelatedObjectCountField('vlans')
@ -45,7 +46,7 @@ class VLANGroupSerializer(NetBoxModelSerializer):
model = VLANGroup
fields = [
'id', 'url', 'display_url', 'display', 'name', 'slug', 'scope_type', 'scope_id', 'scope', 'vid_ranges',
'description', 'tags', 'custom_fields', 'created', 'last_updated', 'vlan_count', 'utilization'
'tenant', 'description', 'tags', 'custom_fields', 'created', 'last_updated', 'vlan_count', 'utilization'
]
brief_fields = ('id', 'url', 'display', 'name', 'slug', 'description', 'vlan_count')
validators = []

View File

@ -857,7 +857,7 @@ class FHRPGroupAssignmentFilterSet(ChangeLoggedModelFilterSet):
)
class VLANGroupFilterSet(OrganizationalModelFilterSet):
class VLANGroupFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
scope_type = ContentTypeFilter()
region = django_filters.NumberFilter(
method='filter_scope'

View File

@ -430,11 +430,17 @@ class VLANGroupBulkEditForm(NetBoxModelBulkEditForm):
label=_('VLAN ID ranges'),
required=False
)
tenant = DynamicModelChoiceField(
label=_('Tenant'),
queryset=Tenant.objects.all(),
required=False
)
model = VLANGroup
fieldsets = (
FieldSet('site', 'vid_ranges', 'description'),
FieldSet('scope_type', 'scope', name=_('Scope')),
FieldSet('tenant', name=_('Tenancy')),
)
nullable_fields = ('description', 'scope')

View File

@ -438,10 +438,17 @@ class VLANGroupImportForm(NetBoxModelImportForm):
vid_ranges = NumericRangeArrayField(
required=False
)
tenant = CSVModelChoiceField(
label=_('Tenant'),
queryset=Tenant.objects.all(),
required=False,
to_field_name='name',
help_text=_('Assigned tenant')
)
class Meta:
model = VLANGroup
fields = ('name', 'slug', 'scope_type', 'scope_id', 'vid_ranges', 'description', 'tags')
fields = ('name', 'slug', 'scope_type', 'scope_id', 'vid_ranges', 'tenant', 'description', 'tags')
labels = {
'scope_id': 'Scope ID',
}

View File

@ -411,12 +411,13 @@ class FHRPGroupFilterForm(NetBoxModelFilterSetForm):
tag = TagFilterField(model)
class VLANGroupFilterForm(NetBoxModelFilterSetForm):
class VLANGroupFilterForm(TenancyFilterForm, NetBoxModelFilterSetForm):
fieldsets = (
FieldSet('q', 'filter_id', 'tag'),
FieldSet('region', 'sitegroup', 'site', 'location', 'rack', name=_('Location')),
FieldSet('cluster_group', 'cluster', name=_('Cluster')),
FieldSet('contains_vid', name=_('VLANs')),
FieldSet('tenant_group_id', 'tenant_id', name=_('Tenant')),
)
model = VLANGroup
region = DynamicModelMultipleChoiceField(

View File

@ -598,7 +598,7 @@ class FHRPGroupAssignmentForm(forms.ModelForm):
return group
class VLANGroupForm(NetBoxModelForm):
class VLANGroupForm(TenancyForm, NetBoxModelForm):
slug = SlugField()
vid_ranges = NumericRangeArrayField(
label=_('VLAN IDs')
@ -621,12 +621,13 @@ class VLANGroupForm(NetBoxModelForm):
FieldSet('name', 'slug', 'description', 'tags', name=_('VLAN Group')),
FieldSet('vid_ranges', name=_('Child VLANs')),
FieldSet('scope_type', 'scope', name=_('Scope')),
FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
)
class Meta:
model = VLANGroup
fields = [
'name', 'slug', 'description', 'vid_ranges', 'scope_type', 'tags',
'name', 'slug', 'description', 'vid_ranges', 'scope_type', 'tenant_group', 'tenant', 'tags',
]
def __init__(self, *args, **kwargs):

View File

@ -266,6 +266,7 @@ class VLANGroupType(OrganizationalObjectType):
vlans: List[VLANType]
vid_ranges: List[str]
tenant: Annotated["TenantType", strawberry.lazy('tenancy.graphql.types')] | None
@strawberry_django.field
def scope(self) -> Annotated[Union[

View File

@ -0,0 +1,26 @@
# Generated by Django 5.1.3 on 2025-02-20 17:49
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ipam', '0076_natural_ordering'),
('tenancy', '0017_natural_ordering'),
]
operations = [
migrations.AddField(
model_name='vlangroup',
name='tenant',
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.PROTECT,
related_name='vlan_groups',
to='tenancy.tenant',
),
),
]

View File

@ -62,6 +62,13 @@ class VLANGroup(OrganizationalModel):
verbose_name=_('VLAN ID ranges'),
default=default_vid_ranges
)
tenant = models.ForeignKey(
to='tenancy.Tenant',
on_delete=models.PROTECT,
related_name='vlan_groups',
blank=True,
null=True
)
_total_vlan_ids = models.PositiveBigIntegerField(
default=VLAN_VID_MAX - VLAN_VID_MIN + 1
)

View File

@ -28,7 +28,7 @@ AVAILABLE_LABEL = mark_safe('<span class="badge text-bg-success">Available</span
# VLAN groups
#
class VLANGroupTable(NetBoxTable):
class VLANGroupTable(TenancyColumnsMixin, NetBoxTable):
name = tables.Column(
verbose_name=_('Name'),
linkify=True
@ -65,9 +65,11 @@ class VLANGroupTable(NetBoxTable):
model = VLANGroup
fields = (
'pk', 'id', 'name', 'scope_type', 'scope', 'vid_ranges_list', 'vlan_count', 'slug', 'description',
'tags', 'created', 'last_updated', 'actions', 'utilization',
'tenant', 'tenant_group', 'tags', 'created', 'last_updated', 'actions', 'utilization',
)
default_columns = (
'pk', 'name', 'scope_type', 'scope', 'vlan_count', 'utilization', 'tenant', 'description'
)
default_columns = ('pk', 'name', 'scope_type', 'scope', 'vlan_count', 'utilization', 'description')
#

View File

@ -1568,27 +1568,45 @@ class VLANGroupTestCase(TestCase, ChangeLoggedFilterSetTests):
cluster = Cluster(name='Cluster 1', type=clustertype)
cluster.save()
tenant_groups = (
TenantGroup(name='Tenant group 1', slug='tenant-group-1'),
TenantGroup(name='Tenant group 2', slug='tenant-group-2'),
TenantGroup(name='Tenant group 3', slug='tenant-group-3'),
)
for tenantgroup in tenant_groups:
tenantgroup.save()
tenants = (
Tenant(name='Tenant 1', slug='tenant-1', group=tenant_groups[0]),
Tenant(name='Tenant 2', slug='tenant-2', group=tenant_groups[1]),
Tenant(name='Tenant 3', slug='tenant-3', group=tenant_groups[2]),
)
Tenant.objects.bulk_create(tenants)
vlan_groups = (
VLANGroup(
name='VLAN Group 1',
slug='vlan-group-1',
vid_ranges=[NumericRange(1, 11), NumericRange(100, 200)],
scope=region,
description='foobar1'
description='foobar1',
tenant=tenants[0]
),
VLANGroup(
name='VLAN Group 2',
slug='vlan-group-2',
vid_ranges=[NumericRange(1, 11), NumericRange(200, 300)],
scope=sitegroup,
description='foobar2'
description='foobar2',
tenant=tenants[1]
),
VLANGroup(
name='VLAN Group 3',
slug='vlan-group-3',
vid_ranges=[NumericRange(1, 11), NumericRange(300, 400)],
scope=site,
description='foobar3'
description='foobar3',
tenant=tenants[1]
),
VLANGroup(
name='VLAN Group 4',
@ -1671,6 +1689,20 @@ class VLANGroupTestCase(TestCase, ChangeLoggedFilterSetTests):
params = {'cluster': Cluster.objects.first().pk}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 1)
def test_tenant(self):
tenants = Tenant.objects.all()[:2]
params = {'tenant_id': [tenants[0].pk, tenants[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'tenant': [tenants[0].slug, tenants[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
def test_tenant_group(self):
tenant_groups = TenantGroup.objects.all()[:2]
params = {'tenant_group_id': [tenant_groups[0].pk, tenant_groups[1].pk]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
params = {'tenant_group': [tenant_groups[0].slug, tenant_groups[1].slug]}
self.assertEqual(self.filterset(params, self.queryset).qs.count(), 3)
class VLANTestCase(TestCase, ChangeLoggedFilterSetTests):
queryset = VLAN.objects.all()

View File

@ -4,15 +4,15 @@ from django import __version__ as DJANGO_VERSION
from django.apps import apps
from django.conf import settings
from django_rq.queues import get_connection
from drf_spectacular.utils import extend_schema
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema
from rest_framework.response import Response
from rest_framework.reverse import reverse
from rest_framework.views import APIView
from rq.worker import Worker
from netbox.plugins.utils import get_installed_plugins
from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
from netbox.plugins.utils import get_installed_plugins
class APIRootView(APIView):
@ -66,7 +66,8 @@ class StatusView(APIView):
return Response({
'django-version': DJANGO_VERSION,
'installed-apps': installed_apps,
'netbox-version': settings.RELEASE.full_version,
'netbox-version': settings.RELEASE.version,
'netbox-full-version': settings.RELEASE.full_version,
'plugins': get_installed_plugins(),
'python-version': platform.python_version(),
'rq-workers-running': Worker.count(get_connection('default')),

View File

@ -5,6 +5,7 @@ from functools import cached_property
from django.contrib.contenttypes.fields import GenericRelation
from django.core.validators import ValidationError
from django.db import models
from django.db.models import Q
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from taggit.managers import TaggableManager
@ -363,6 +364,26 @@ class ContactsMixin(models.Model):
class Meta:
abstract = True
def get_contacts(self, inherited=True):
"""
Return a `QuerySet` matching all contacts assigned to this object.
:param inherited: If `True`, inherited contacts from parent objects are included.
"""
from tenancy.models import ContactAssignment
from . import NestedGroupModel
filter = Q(
object_type=ObjectType.objects.get_for_model(self),
object_id__in=(
self.get_ancestors(include_self=True)
if (isinstance(self, NestedGroupModel) and inherited)
else [self.pk]
),
)
return ContactAssignment.objects.filter(filter)
class BookmarksMixin(models.Model):
"""

View File

@ -1,7 +1,7 @@
import inspect
import warnings
from django.utils.translation import gettext_lazy as _
from netbox.registry import registry
from .navigation import PluginMenu, PluginMenuButton, PluginMenuItem
from .templates import PluginTemplateExtension
@ -35,16 +35,8 @@ def register_template_extensions(class_list):
)
if template_extension.models:
# Registration for multiple models
# Registration for specific models
models = template_extension.models
elif template_extension.model:
# Registration for a single model (deprecated)
warnings.warn(
"PluginTemplateExtension.model is deprecated and will be removed in a future release. Use "
"'models' instead.",
DeprecationWarning
)
models = [template_extension.model]
else:
# Global registration (no specific models)
models = [None]

View File

@ -11,8 +11,14 @@ class PluginTemplateExtension:
This class is used to register plugin content to be injected into core NetBox templates. It contains methods
that are overridden by plugin authors to return template content.
The `model` attribute on the class defines the which model detail page this class renders content for. It
should be set as a string in the form '<app_label>.<model_name>'. render() provides the following context data:
The `models` attribute on the class defines the which specific model detail pages this class renders content
for. It should be defined as a list of strings in the following form:
models = ['<app_label>.<model_name>', '<app_label>.<model_name>']
If `models` is left as None, the extension will render for _all_ models.
The `render()` method provides the following context data:
* object - The object being viewed (object views only)
* model - The type of object being viewed (list views only)
@ -21,7 +27,6 @@ class PluginTemplateExtension:
* config - Plugin-specific configuration parameters
"""
models = None
model = None # Deprecated; use `models` instead
def __init__(self, context):
self.context = context

View File

@ -9,6 +9,7 @@ import warnings
from django.contrib.messages import constants as messages
from django.core.exceptions import ImproperlyConfigured, ValidationError
from django.core.validators import URLValidator
from django.utils.module_loading import import_string
from django.utils.translation import gettext_lazy as _
from netbox.config import PARAMS as CONFIG_PARAMS
@ -117,7 +118,7 @@ EXEMPT_VIEW_PERMISSIONS = getattr(configuration, 'EXEMPT_VIEW_PERMISSIONS', [])
FIELD_CHOICES = getattr(configuration, 'FIELD_CHOICES', {})
FILE_UPLOAD_MAX_MEMORY_SIZE = getattr(configuration, 'FILE_UPLOAD_MAX_MEMORY_SIZE', 2621440)
GRAPHQL_MAX_ALIASES = getattr(configuration, 'GRAPHQL_MAX_ALIASES', 10)
HTTP_PROXIES = getattr(configuration, 'HTTP_PROXIES', None)
HTTP_PROXIES = getattr(configuration, 'HTTP_PROXIES', {})
INTERNAL_IPS = getattr(configuration, 'INTERNAL_IPS', ('127.0.0.1', '::1'))
ISOLATED_DEPLOYMENT = getattr(configuration, 'ISOLATED_DEPLOYMENT', False)
JINJA2_FILTERS = getattr(configuration, 'JINJA2_FILTERS', {})
@ -132,6 +133,7 @@ MEDIA_ROOT = getattr(configuration, 'MEDIA_ROOT', os.path.join(BASE_DIR, 'media'
METRICS_ENABLED = getattr(configuration, 'METRICS_ENABLED', False)
PLUGINS = getattr(configuration, 'PLUGINS', [])
PLUGINS_CONFIG = getattr(configuration, 'PLUGINS_CONFIG', {})
PROXY_ROUTERS = getattr(configuration, 'PROXY_ROUTERS', ['utilities.proxy.DefaultProxyRouter'])
QUEUE_MAPPINGS = getattr(configuration, 'QUEUE_MAPPINGS', {})
REDIS = getattr(configuration, 'REDIS') # Required
RELEASE_CHECK_URL = getattr(configuration, 'RELEASE_CHECK_URL', None)
@ -203,6 +205,14 @@ if RELEASE_CHECK_URL:
"RELEASE_CHECK_URL must be a valid URL. Example: https://api.github.com/repos/netbox-community/netbox"
)
# Validate configured proxy routers
for path in PROXY_ROUTERS:
if type(path) is str:
try:
import_string(path)
except ImportError:
raise ImproperlyConfigured(f"Invalid path in PROXY_ROUTERS: {path}")
#
# Database
@ -577,6 +587,7 @@ if SENTRY_ENABLED:
sample_rate=SENTRY_SAMPLE_RATE,
traces_sample_rate=SENTRY_TRACES_SAMPLE_RATE,
send_default_pii=SENTRY_SEND_DEFAULT_PII,
# TODO: Support proxy routing
http_proxy=HTTP_PROXIES.get('http') if HTTP_PROXIES else None,
https_proxy=HTTP_PROXIES.get('https') if HTTP_PROXIES else None
)

View File

@ -1,148 +0,0 @@
import logging
from django.contrib.contenttypes.models import ContentType
from django.db import transaction
from django.db.models.signals import m2m_changed, pre_delete, post_save
from extras.choices import ChangeActionChoices
from extras.models import StagedChange
from utilities.serialization import serialize_object
logger = logging.getLogger('netbox.staging')
class checkout:
"""
Context manager for staging changes to NetBox objects. Staged changes are saved out-of-band
(as Change instances) for application at a later time, without modifying the production
database.
branch = Branch.objects.create(name='my-branch')
with checkout(branch):
# All changes made herein will be rolled back and stored for later
Note that invoking the context disabled transaction autocommit to facilitate manual rollbacks,
and restores its original value upon exit.
"""
def __init__(self, branch):
self.branch = branch
self.queue = {}
def __enter__(self):
# Disable autocommit to effect a new transaction
logger.debug(f"Entering transaction for {self.branch}")
self._autocommit = transaction.get_autocommit()
transaction.set_autocommit(False)
# Apply any existing Changes assigned to this Branch
staged_changes = self.branch.staged_changes.all()
if change_count := staged_changes.count():
logger.debug(f"Applying {change_count} pre-staged changes...")
for change in staged_changes:
change.apply()
else:
logger.debug("No pre-staged changes found")
# Connect signal handlers
logger.debug("Connecting signal handlers")
post_save.connect(self.post_save_handler)
m2m_changed.connect(self.post_save_handler)
pre_delete.connect(self.pre_delete_handler)
def __exit__(self, exc_type, exc_val, exc_tb):
# Disconnect signal handlers
logger.debug("Disconnecting signal handlers")
post_save.disconnect(self.post_save_handler)
m2m_changed.disconnect(self.post_save_handler)
pre_delete.disconnect(self.pre_delete_handler)
# Roll back the transaction to return the database to its original state
logger.debug("Rolling back database transaction")
transaction.rollback()
logger.debug(f"Restoring autocommit state ({self._autocommit})")
transaction.set_autocommit(self._autocommit)
# Process queued changes
self.process_queue()
#
# Queuing
#
@staticmethod
def get_key_for_instance(instance):
return ContentType.objects.get_for_model(instance), instance.pk
def process_queue(self):
"""
Create Change instances for all actions stored in the queue.
"""
if not self.queue:
logger.debug("No queued changes; aborting")
return
logger.debug(f"Processing {len(self.queue)} queued changes")
# Iterate through the in-memory queue, creating Change instances
changes = []
for key, change in self.queue.items():
logger.debug(f' {key}: {change}')
object_type, pk = key
action, data = change
changes.append(StagedChange(
branch=self.branch,
action=action,
object_type=object_type,
object_id=pk,
data=data
))
# Save all Change instances to the database
StagedChange.objects.bulk_create(changes)
#
# Signal handlers
#
def post_save_handler(self, sender, instance, **kwargs):
"""
Hooks to the post_save signal when a branch is active to queue create and update actions.
"""
key = self.get_key_for_instance(instance)
object_type = instance._meta.verbose_name
# Creating a new object
if kwargs.get('created'):
logger.debug(f"[{self.branch}] Staging creation of {object_type} {instance} (PK: {instance.pk})")
data = serialize_object(instance, resolve_tags=False)
self.queue[key] = (ChangeActionChoices.ACTION_CREATE, data)
return
# Ignore pre_* many-to-many actions
if 'action' in kwargs and kwargs['action'] not in ('post_add', 'post_remove', 'post_clear'):
return
# Object has already been created/updated in the queue; update its queued representation
if key in self.queue:
logger.debug(f"[{self.branch}] Updating staged value for {object_type} {instance} (PK: {instance.pk})")
data = serialize_object(instance, resolve_tags=False)
self.queue[key] = (self.queue[key][0], data)
return
# Modifying an existing object for the first time
logger.debug(f"[{self.branch}] Staging changes to {object_type} {instance} (PK: {instance.pk})")
data = serialize_object(instance, resolve_tags=False)
self.queue[key] = (ChangeActionChoices.ACTION_UPDATE, data)
def pre_delete_handler(self, sender, instance, **kwargs):
"""
Hooks to the pre_delete signal when a branch is active to queue delete actions.
"""
key = self.get_key_for_instance(instance)
object_type = instance._meta.verbose_name
# Delete an existing object
logger.debug(f"[{self.branch}] Staging deletion of {object_type} {instance} (PK: {instance.pk})")
self.queue[key] = (ChangeActionChoices.ACTION_DELETE, None)

View File

@ -1,216 +0,0 @@
from django.db.models.signals import post_save
from django.test import TransactionTestCase
from circuits.models import Provider, Circuit, CircuitType
from extras.choices import ChangeActionChoices
from extras.models import Branch, StagedChange, Tag
from ipam.models import ASN, RIR
from netbox.search.backends import search_backend
from netbox.staging import checkout
from utilities.testing import create_tags
class StagingTestCase(TransactionTestCase):
def setUp(self):
# Disconnect search backend to avoid issues with cached ObjectTypes being deleted
# from the database upon transaction rollback
post_save.disconnect(search_backend.caching_handler)
create_tags('Alpha', 'Bravo', 'Charlie')
rir = RIR.objects.create(name='RIR 1', slug='rir-1')
asns = (
ASN(asn=65001, rir=rir),
ASN(asn=65002, rir=rir),
ASN(asn=65003, rir=rir),
)
ASN.objects.bulk_create(asns)
providers = (
Provider(name='Provider A', slug='provider-a'),
Provider(name='Provider B', slug='provider-b'),
Provider(name='Provider C', slug='provider-c'),
)
Provider.objects.bulk_create(providers)
circuit_type = CircuitType.objects.create(name='Circuit Type 1', slug='circuit-type-1')
Circuit.objects.bulk_create((
Circuit(provider=providers[0], cid='Circuit A1', type=circuit_type),
Circuit(provider=providers[0], cid='Circuit A2', type=circuit_type),
Circuit(provider=providers[0], cid='Circuit A3', type=circuit_type),
Circuit(provider=providers[1], cid='Circuit B1', type=circuit_type),
Circuit(provider=providers[1], cid='Circuit B2', type=circuit_type),
Circuit(provider=providers[1], cid='Circuit B3', type=circuit_type),
Circuit(provider=providers[2], cid='Circuit C1', type=circuit_type),
Circuit(provider=providers[2], cid='Circuit C2', type=circuit_type),
Circuit(provider=providers[2], cid='Circuit C3', type=circuit_type),
))
def test_object_creation(self):
branch = Branch.objects.create(name='Branch 1')
tags = Tag.objects.all()
asns = ASN.objects.all()
with checkout(branch):
provider = Provider.objects.create(name='Provider D', slug='provider-d')
provider.asns.set(asns)
circuit = Circuit.objects.create(provider=provider, cid='Circuit D1', type=CircuitType.objects.first())
circuit.tags.set(tags)
# Sanity-checking
self.assertEqual(Provider.objects.count(), 4)
self.assertListEqual(list(provider.asns.all()), list(asns))
self.assertEqual(Circuit.objects.count(), 10)
self.assertListEqual(list(circuit.tags.all()), list(tags))
# Verify that changes have been rolled back after exiting the context
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Circuit.objects.count(), 9)
self.assertEqual(StagedChange.objects.count(), 5)
# Verify that changes are replayed upon entering the context
with checkout(branch):
self.assertEqual(Provider.objects.count(), 4)
self.assertEqual(Circuit.objects.count(), 10)
provider = Provider.objects.get(name='Provider D')
self.assertListEqual(list(provider.asns.all()), list(asns))
circuit = Circuit.objects.get(cid='Circuit D1')
self.assertListEqual(list(circuit.tags.all()), list(tags))
# Verify that changes are applied and deleted upon branch merge
branch.merge()
self.assertEqual(Provider.objects.count(), 4)
self.assertEqual(Circuit.objects.count(), 10)
provider = Provider.objects.get(name='Provider D')
self.assertListEqual(list(provider.asns.all()), list(asns))
circuit = Circuit.objects.get(cid='Circuit D1')
self.assertListEqual(list(circuit.tags.all()), list(tags))
self.assertEqual(StagedChange.objects.count(), 0)
def test_object_modification(self):
branch = Branch.objects.create(name='Branch 1')
tags = Tag.objects.all()
asns = ASN.objects.all()
with checkout(branch):
provider = Provider.objects.get(name='Provider A')
provider.name = 'Provider X'
provider.save()
provider.asns.set(asns)
circuit = Circuit.objects.get(cid='Circuit A1')
circuit.cid = 'Circuit X'
circuit.save()
circuit.tags.set(tags)
# Sanity-checking
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Provider.objects.get(pk=provider.pk).name, 'Provider X')
self.assertListEqual(list(provider.asns.all()), list(asns))
self.assertEqual(Circuit.objects.count(), 9)
self.assertEqual(Circuit.objects.get(pk=circuit.pk).cid, 'Circuit X')
self.assertListEqual(list(circuit.tags.all()), list(tags))
# Verify that changes have been rolled back after exiting the context
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Provider.objects.get(pk=provider.pk).name, 'Provider A')
provider = Provider.objects.get(pk=provider.pk)
self.assertListEqual(list(provider.asns.all()), [])
self.assertEqual(Circuit.objects.count(), 9)
circuit = Circuit.objects.get(pk=circuit.pk)
self.assertEqual(circuit.cid, 'Circuit A1')
self.assertListEqual(list(circuit.tags.all()), [])
self.assertEqual(StagedChange.objects.count(), 5)
# Verify that changes are replayed upon entering the context
with checkout(branch):
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Provider.objects.get(pk=provider.pk).name, 'Provider X')
provider = Provider.objects.get(pk=provider.pk)
self.assertListEqual(list(provider.asns.all()), list(asns))
self.assertEqual(Circuit.objects.count(), 9)
circuit = Circuit.objects.get(pk=circuit.pk)
self.assertEqual(circuit.cid, 'Circuit X')
self.assertListEqual(list(circuit.tags.all()), list(tags))
# Verify that changes are applied and deleted upon branch merge
branch.merge()
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Provider.objects.get(pk=provider.pk).name, 'Provider X')
provider = Provider.objects.get(pk=provider.pk)
self.assertListEqual(list(provider.asns.all()), list(asns))
self.assertEqual(Circuit.objects.count(), 9)
circuit = Circuit.objects.get(pk=circuit.pk)
self.assertEqual(circuit.cid, 'Circuit X')
self.assertListEqual(list(circuit.tags.all()), list(tags))
self.assertEqual(StagedChange.objects.count(), 0)
def test_object_deletion(self):
branch = Branch.objects.create(name='Branch 1')
with checkout(branch):
provider = Provider.objects.get(name='Provider A')
provider.circuits.all().delete()
provider.delete()
# Sanity-checking
self.assertEqual(Provider.objects.count(), 2)
self.assertEqual(Circuit.objects.count(), 6)
# Verify that changes have been rolled back after exiting the context
self.assertEqual(Provider.objects.count(), 3)
self.assertEqual(Circuit.objects.count(), 9)
self.assertEqual(StagedChange.objects.count(), 4)
# Verify that changes are replayed upon entering the context
with checkout(branch):
self.assertEqual(Provider.objects.count(), 2)
self.assertEqual(Circuit.objects.count(), 6)
# Verify that changes are applied and deleted upon branch merge
branch.merge()
self.assertEqual(Provider.objects.count(), 2)
self.assertEqual(Circuit.objects.count(), 6)
self.assertEqual(StagedChange.objects.count(), 0)
def test_exit_enter_context(self):
branch = Branch.objects.create(name='Branch 1')
with checkout(branch):
# Create a new object
provider = Provider.objects.create(name='Provider D', slug='provider-d')
provider.save()
# Check that a create Change was recorded
self.assertEqual(StagedChange.objects.count(), 1)
change = StagedChange.objects.first()
self.assertEqual(change.action, ChangeActionChoices.ACTION_CREATE)
self.assertEqual(change.data['name'], provider.name)
with checkout(branch):
# Update the staged object
provider = Provider.objects.get(name='Provider D')
provider.comments = 'New comments'
provider.save()
# Check that a second Change object has been created for the object
self.assertEqual(StagedChange.objects.count(), 2)
change = StagedChange.objects.last()
self.assertEqual(change.action, ChangeActionChoices.ACTION_UPDATE)
self.assertEqual(change.data['name'], provider.name)
self.assertEqual(change.data['comments'], provider.comments)
with checkout(branch):
# Delete the staged object
provider = Provider.objects.get(name='Provider D')
provider.delete()
# Check that a third Change has recorded the object's deletion
self.assertEqual(StagedChange.objects.count(), 3)
change = StagedChange.objects.last()
self.assertEqual(change.action, ChangeActionChoices.ACTION_DELETE)
self.assertIsNone(change.data)

Binary file not shown.

View File

@ -23,7 +23,7 @@
},
"dependencies": {
"@mdi/font": "7.4.47",
"@tabler/core": "1.0.0-beta21",
"@tabler/core": "1.0.0",
"bootstrap": "5.3.3",
"clipboard": "2.0.11",
"flatpickr": "4.6.13",
@ -53,5 +53,6 @@
},
"resolutions": {
"@types/bootstrap/**/@popperjs/core": "^2.11.6"
}
},
"packageManager": "yarn@1.22.22+sha512.a6b2f7906b721bba3d67d4aff083df04dad64c399707841b7acf00f6b133b7ac24255f2652fa22ae3534329dc6180534e98d17432037ff6fd140556e2bb3137e"
}

View File

@ -2,7 +2,6 @@
// Set base fonts
$font-family-sans-serif: 'Inter', system-ui, sans-serif;
// See https://github.com/tabler/tabler/issues/1812
$font-family-monospace: 'Roboto Mono';
// Set the navigation sidebar width
@ -16,9 +15,6 @@ $btn-padding-y: 0.25rem;
$table-cell-padding-x: 0.5rem;
$table-cell-padding-y: 0.5rem;
// Fix Tabler bug #1694 in 1.0.0-beta20
$hover-bg: rgba(var(--tblr-secondary-rgb), 0.08);
// Ensure active nav-pill has a background color in dark mode
$nav-pills-link-active-bg: rgba(var(--tblr-secondary-rgb), 0.15);

View File

@ -8,8 +8,8 @@
// Adjust hover color & style for menu items
.navbar-collapse {
.nav-link-icon {
color: var(--tblr-nav-link-color) !important;
.nav-link-icon, .nav-link-title {
color: $rich-black;
}
.text-secondary {
color: $dark-teal !important;
@ -26,8 +26,8 @@
visibility: hidden;
}
// Style menu item hover state
&:hover {
// Style menu item hover/active state
&:hover, &.active {
background-color: var(--tblr-navbar-active-bg);
a {
text-decoration: none;
@ -37,17 +37,6 @@
}
}
// Style active menu item
&.active {
background-color: var(--tblr-navbar-active-bg);
a {
color: $rich-black;
}
.btn-group {
visibility: visible;
}
}
}
}
}
@ -109,22 +98,17 @@ body[data-bs-theme=dark] .navbar-vertical.navbar-expand-lg {
border-color: $bright-teal !important;
}
.nav-link-title, .nav-link-icon {
color: white !important;
}
// Adjust hover color & style for menu items
.dropdown-item {
a {
color: white !important;
}
&.active {
&.active, &:hover {
background-color: $navbar-dark-active-bg !important;
a {
color: white !important;
}
}
&:hover {
background-color: $navbar-dark-active-bg !important;
}
.nav-link-title {
color: white !important;
}
}
.text-secondary {

View File

@ -1,3 +1,8 @@
// Reduce column heading font size
.table thead th {
font-size: 0.625rem;
}
// Object list tables
table.object-list {

View File

@ -759,19 +759,19 @@
resolved "https://registry.yarnpkg.com/@rtsao/scc/-/scc-1.1.0.tgz#927dd2fae9bc3361403ac2c7a00c32ddce9ad7e8"
integrity sha512-zt6OdqaDoOnJ1ZYsCYGt9YmWzDXl4vQdKTyJev62gFhRGKdx7mcT54V9KIjg+d2wi9EXsPvAPKe7i7WjfVWB8g==
"@tabler/core@1.0.0-beta21":
version "1.0.0-beta21"
resolved "https://registry.yarnpkg.com/@tabler/core/-/core-1.0.0-beta21.tgz#cd10d7648b3b7b31927a430fd776d3304e796403"
integrity sha512-9ZKu38BScc0eHruhX/SlVDSiXenBFSgBp2WDq6orkuC8J/1yutKDt7CdXuJpBwkiADEk5yqYV31Ku+CnhwOc3Q==
"@tabler/core@1.0.0":
version "1.0.0"
resolved "https://registry.yarnpkg.com/@tabler/core/-/core-1.0.0.tgz#08736378108663b5893a31ad462be7d12e64be67"
integrity sha512-uFmv6f8TAaW2JaGwzjT1LfK+TjmBQSTCoznCMdV5uur4cv4TtJlV8Hh1Beu55YX0svMtOQ0Xts7tYv/+qBEcfA==
dependencies:
"@popperjs/core" "^2.11.8"
"@tabler/icons" "^3.14.0"
"@tabler/icons" "^3.29.0"
bootstrap "5.3.3"
"@tabler/icons@^3.14.0":
version "3.16.0"
resolved "https://registry.yarnpkg.com/@tabler/icons/-/icons-3.16.0.tgz#d618670b80163925a31a6c2290e8775f6058d81a"
integrity sha512-GU7MSx4uQEr55BmyON6hD/QYTl6k1v0YlRhM91gBWDoKAbyCt6QIYw7rpJ/ecdh5zrHaTOJKPenZ4+luoutwFA==
"@tabler/icons@^3.29.0":
version "3.30.0"
resolved "https://registry.yarnpkg.com/@tabler/icons/-/icons-3.30.0.tgz#4f80f52cc6355b440a4ee0dadd4c3e3775e50663"
integrity sha512-c8OKLM48l00u9TFbh2qhSODMONIzML8ajtCyq95rW8vzkWcBrKRPM61tdkThz2j4kd5u17srPGIjqdeRUZdfdw==
"@tanstack/react-virtual@^3.0.0-beta.60":
version "3.5.0"

View File

@ -46,6 +46,10 @@
<th scope="row">{% trans "Status" %}</th>
<td>{% badge object.get_status_display bg_color=object.get_status_color %}</td>
</tr>
<tr>
<th scope="row">{% trans "Sync interval" %}</th>
<td>{{ object.get_sync_interval_display|placeholder }}</td>
</tr>
<tr>
<th scope="row">{% trans "Last synced" %}</th>
<td>{{ object.last_synced|placeholder }}</td>

View File

@ -36,6 +36,10 @@
<th scope="row">{% trans "Type" %}</th>
<td>{{ object.get_type_display }}</td>
</tr>
<tr>
<th scope="row">{% trans "Status" %}</th>
<td>{% badge object.get_status_display bg_color=object.get_status_color %}</td>
</tr>
<tr>
<th scope="row">{% trans "Description" %}</th>
<td>{{ object.description|placeholder }}</td>

View File

@ -46,6 +46,15 @@
<th scope="row">Utilization</th>
<td>{% utilization_graph object.utilization %}</td>
</tr>
<tr>
<th scope="row">{% trans "Tenant" %}</th>
<td>
{% if object.tenant.group %}
{{ object.tenant.group|linkify }} /
{% endif %}
{{ object.tenant|linkify|placeholder }}
</td>
</tr>
</table>
</div>
{% include 'inc/panels/tags.html' %}

View File

@ -17,25 +17,13 @@ class ObjectContactsView(generic.ObjectChildrenView):
template_name = 'tenancy/object_contacts.html'
tab = ViewTab(
label=_('Contacts'),
badge=lambda obj: obj.contacts.count(),
badge=lambda obj: obj.get_contacts().count(),
permission='tenancy.view_contactassignment',
weight=5000
)
def get_children(self, request, parent):
return ContactAssignment.objects.restrict(request.user, 'view').filter(
object_type=ContentType.objects.get_for_model(parent),
object_id=parent.pk
).order_by('priority', 'contact', 'role')
def get_table(self, *args, **kwargs):
table = super().get_table(*args, **kwargs)
# Hide object columns
table.columns.hide('object_type')
table.columns.hide('object')
return table
return parent.get_contacts().restrict(request.user, 'view').order_by('priority', 'contact', 'role')
#

55
netbox/utilities/proxy.py Normal file
View File

@ -0,0 +1,55 @@
from django.conf import settings
from django.utils.module_loading import import_string
from urllib.parse import urlparse
__all__ = (
'DefaultProxyRouter',
'resolve_proxies',
)
class DefaultProxyRouter:
"""
Base class for a proxy router.
"""
@staticmethod
def _get_protocol_from_url(url):
"""
Determine the applicable protocol (e.g. HTTP or HTTPS) from the given URL.
"""
return urlparse(url).scheme
def route(self, url=None, protocol=None, context=None):
"""
Returns the appropriate proxy given a URL or protocol. Arbitrary context data may also be passed where
available.
Args:
url: The specific request URL for which the proxy will be used (if known)
protocol: The protocol in use (e.g. http or https) (if known)
context: Additional context to aid in proxy selection. May include e.g. the requesting client.
"""
if url and protocol is None:
protocol = self._get_protocol_from_url(url)
if protocol and protocol in settings.HTTP_PROXIES:
return {
protocol: settings.HTTP_PROXIES[protocol]
}
return settings.HTTP_PROXIES
def resolve_proxies(url=None, protocol=None, context=None):
"""
Return a dictionary of candidate proxies (compatible with the requests module), or None.
Args:
url: The specific request URL for which the proxy will be used (optional)
protocol: The protocol in use (e.g. http or https) (optional)
context: Arbitrary additional context to aid in proxy selection (optional)
"""
context = context or {}
for item in settings.PROXY_ROUTERS:
router = import_string(item) if type(item) is str else item
if proxies := router().route(url=url, protocol=protocol, context=context):
return proxies

View File

@ -206,22 +206,30 @@ class ViewTab:
Args:
label: Human-friendly text
visible: A callable which determines whether the tab should be displayed. This callable must accept exactly one
argument: the object instance. If a callable is not specified, the tab's visibility will be determined by
its badge (if any) and the value of `hide_if_empty`.
badge: A static value or callable to display alongside the label (optional). If a callable is used, it must
accept a single argument representing the object being viewed.
weight: Numeric weight to influence ordering among other tabs (default: 1000)
permission: The permission required to display the tab (optional).
hide_if_empty: If true, the tab will be displayed only if its badge has a meaningful value. (Tabs without a
badge are always displayed.)
hide_if_empty: If true, the tab will be displayed only if its badge has a meaningful value. (This parameter is
evaluated only if the tab is permitted to be displayed according to the `visible` parameter.)
"""
def __init__(self, label, badge=None, weight=1000, permission=None, hide_if_empty=False):
def __init__(self, label, visible=None, badge=None, weight=1000, permission=None, hide_if_empty=False):
self.label = label
self.visible = visible
self.badge = badge
self.weight = weight
self.permission = permission
self.hide_if_empty = hide_if_empty
def render(self, instance):
"""Return the attributes needed to render a tab in HTML."""
"""
Return the attributes needed to render a tab in HTML if the tab should be displayed. Otherwise, return None.
"""
if self.visible is not None and not self.visible(instance):
return None
badge_value = self._get_badge_value(instance)
if self.badge and self.hide_if_empty and not badge_value:
return None

View File

@ -228,6 +228,7 @@ class L2VPNTypeChoices(ChoiceSet):
TYPE_MPLS_EVPN = 'mpls-evpn'
TYPE_PBB_EVPN = 'pbb-evpn'
TYPE_EVPN_VPWS = 'evpn-vpws'
TYPE_SPB = 'spb'
CHOICES = (
('VPLS', (
@ -255,6 +256,9 @@ class L2VPNTypeChoices(ChoiceSet):
(TYPE_EPTREE, _('Ethernet Private Tree')),
(TYPE_EVPTREE, _('Ethernet Virtual Private Tree')),
)),
('Other', (
(TYPE_SPB, _('SPB')),
)),
)
P2P = (