Annotate queryset instead of using a model property

This commit is contained in:
Daniel Sheppard 2025-03-27 14:36:09 -05:00
parent 1777d4228e
commit e32d2ca637
3 changed files with 63 additions and 10 deletions

View File

@ -2,6 +2,8 @@ from copy import deepcopy
from django.core.exceptions import ObjectDoesNotExist, PermissionDenied from django.core.exceptions import ObjectDoesNotExist, PermissionDenied
from django.db import transaction from django.db import transaction
from django.db.models import Subquery, OuterRef
from django.db.models.functions import JSONObject
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
from django_pglocks import advisory_lock from django_pglocks import advisory_lock
@ -21,6 +23,7 @@ from netbox.api.viewsets.mixins import ObjectValidationMixin
from netbox.config import get_config from netbox.config import get_config
from netbox.constants import ADVISORY_LOCK_KEYS from netbox.constants import ADVISORY_LOCK_KEYS
from utilities.api import get_serializer_for_model from utilities.api import get_serializer_for_model
from utilities.fields import JSONModelField
from . import serializers from . import serializers
@ -90,6 +93,32 @@ class PrefixViewSet(NetBoxModelViewSet):
return serializers.PrefixLengthSerializer return serializers.PrefixLengthSerializer
return super().get_serializer_class() return super().get_serializer_class()
def get_queryset(self):
"""
Return the query set with additional annotations for Aggregate and RIR
"""
qs = super().get_queryset()
# Determine the fields to return
aggregate_fields = JSONObject(**{f.name: f.name for f in Aggregate._meta.get_fields()})
rir_fields = JSONObject(**{f.name: f.name for f in RIR._meta.get_fields()})
# Get the outer reference
prefix_field = OuterRef("prefix")
aggregate_field = OuterRef("aggregate_id")
aggregates = Aggregate.objects.filter(prefix__net_contains_or_equals=prefix_field)
rirs = RIR.objects.filter(aggregates=aggregate_field)
# The sub queries for the annotation, returning a json object of the related model
agg_sq = Subquery(
aggregates.values_list(aggregate_fields)[:1], output_field=JSONModelField(related_model=Aggregate)
)
agg_id_sq = Subquery(aggregates.values_list('pk', flat=True)[:1])
rir_sq = Subquery(rirs.values_list(rir_fields)[:1], output_field=JSONModelField(related_model=RIR))
return qs.annotate(aggregate=agg_sq, aggregate_id=agg_id_sq).annotate(rir=rir_sq)
class IPRangeViewSet(NetBoxModelViewSet): class IPRangeViewSet(NetBoxModelViewSet):
queryset = IPRange.objects.all() queryset = IPRange.objects.all()

View File

@ -322,15 +322,6 @@ class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, Primary
super().save(*args, **kwargs) super().save(*args, **kwargs)
@property
def aggregate(self):
return Aggregate.objects.filter(prefix__net_contains_or_equals=self.prefix).first()
@property
def rir(self):
aggregate = self.aggregate
return aggregate.rir if aggregate else None
@property @property
def family(self): def family(self):
return self.prefix.version if self.prefix else None return self.prefix.version if self.prefix else None

View File

@ -1,7 +1,9 @@
from collections import defaultdict from collections import defaultdict
from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.db import models from django.db import models
from django.db.models import ForeignKey, ManyToOneRel
from django.forms import JSONField
from django.utils.safestring import mark_safe from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@ -13,6 +15,7 @@ __all__ = (
'CounterCacheField', 'CounterCacheField',
'NaturalOrderingField', 'NaturalOrderingField',
'RestrictedGenericForeignKey', 'RestrictedGenericForeignKey',
'JSONModelField',
) )
@ -186,3 +189,33 @@ class CounterCacheField(models.BigIntegerField):
kwargs["to_model"] = self.to_model_name kwargs["to_model"] = self.to_model_name
kwargs["to_field"] = self.to_field_name kwargs["to_field"] = self.to_field_name
return name, path, args, kwargs return name, path, args, kwargs
class JSONModelField(JSONField):
def __init__(self, related_model=None, *args, **kwargs):
"""
Extract the related model from the kwargs and set after instantiation
"""
super().__init__(*args, **kwargs)
self.related_model = related_model
def from_db_value(self, value, expression, connection):
"""
Return the actual instantiated model from the fields, minus the models that cannot be worked with
"""
data = super().from_db_value(value, expression, connection)
# Return nothing if there is nothing
if data is None:
return None
# Extract the fields from the meta for processing
fields = {f.name: f for f in self.related_model._meta.get_fields()}
keys = data.copy().keys()
for key in keys:
if key not in fields or isinstance(fields.get(key), (GenericRelation, ForeignKey, ManyToOneRel, )):
# Delete un-parsable fields
del data[key]
# Return the full model minus deleted fields
return self.related_model(**data)