mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-24 17:38:37 -06:00
added duplicates() method to IPAddress and Prefix model managers.
refactored condition on IPAddress and Prefix clean method to use new manager method.
This commit is contained in:
parent
98e2145b52
commit
cfaf8b9157
@ -262,6 +262,12 @@ class PrefixQuerySet(NullsFirstQuerySet):
|
|||||||
return queryset
|
return queryset
|
||||||
return filter(lambda p: p.depth <= limit, queryset)
|
return filter(lambda p: p.depth <= limit, queryset)
|
||||||
|
|
||||||
|
def duplicates(self, prefix):
|
||||||
|
return self.filter(
|
||||||
|
vrf=prefix.vrf,
|
||||||
|
prefix=str(prefix)
|
||||||
|
).exclude(pk=prefix.pk)
|
||||||
|
|
||||||
|
|
||||||
class Prefix(CreatedUpdatedModel, CustomFieldModel):
|
class Prefix(CreatedUpdatedModel, CustomFieldModel):
|
||||||
"""
|
"""
|
||||||
@ -299,7 +305,6 @@ class Prefix(CreatedUpdatedModel, CustomFieldModel):
|
|||||||
return reverse('ipam:prefix', args=[self.pk])
|
return reverse('ipam:prefix', args=[self.pk])
|
||||||
|
|
||||||
def clean(self):
|
def clean(self):
|
||||||
|
|
||||||
# Disallow host masks
|
# Disallow host masks
|
||||||
if self.prefix:
|
if self.prefix:
|
||||||
if self.prefix.version == 4 and self.prefix.prefixlen == 32:
|
if self.prefix.version == 4 and self.prefix.prefixlen == 32:
|
||||||
@ -311,6 +316,16 @@ class Prefix(CreatedUpdatedModel, CustomFieldModel):
|
|||||||
'prefix': "Cannot create host addresses (/128) as prefixes. Create an IPv6 address instead."
|
'prefix': "Cannot create host addresses (/128) as prefixes. Create an IPv6 address instead."
|
||||||
})
|
})
|
||||||
|
|
||||||
|
if ((not self.vrf and settings.ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique)):
|
||||||
|
dupes = Prefix.objects.duplicates(self)
|
||||||
|
if dupes:
|
||||||
|
raise ValidationError({
|
||||||
|
'prefix': "Duplicate prefix found in {}: {}".format(
|
||||||
|
"VRF {}".format(self.vrf) if self.vrf else "global table",
|
||||||
|
dupes.first(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
if self.prefix:
|
if self.prefix:
|
||||||
# Clear host bits from prefix
|
# Clear host bits from prefix
|
||||||
@ -361,6 +376,12 @@ class IPAddressManager(models.Manager):
|
|||||||
qs = super(IPAddressManager, self).get_queryset()
|
qs = super(IPAddressManager, self).get_queryset()
|
||||||
return qs.annotate(host=RawSQL('INET(HOST(ipam_ipaddress.address))', [])).order_by('family', 'host')
|
return qs.annotate(host=RawSQL('INET(HOST(ipam_ipaddress.address))', [])).order_by('family', 'host')
|
||||||
|
|
||||||
|
def duplicates(self, ip_obj):
|
||||||
|
return self.filter(
|
||||||
|
vrf=ip_obj.vrf,
|
||||||
|
address__net_host=str(ip_obj.address.ip)
|
||||||
|
).exclude(pk=ip_obj.pk)
|
||||||
|
|
||||||
|
|
||||||
class IPAddress(CreatedUpdatedModel, CustomFieldModel):
|
class IPAddress(CreatedUpdatedModel, CustomFieldModel):
|
||||||
"""
|
"""
|
||||||
@ -401,21 +422,22 @@ class IPAddress(CreatedUpdatedModel, CustomFieldModel):
|
|||||||
return reverse('ipam:ipaddress', args=[self.pk])
|
return reverse('ipam:ipaddress', args=[self.pk])
|
||||||
|
|
||||||
def clean(self):
|
def clean(self):
|
||||||
|
if ((not self.vrf and settings.ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique)):
|
||||||
|
dupes = IPAddress.objects.duplicates(self)
|
||||||
|
if dupes:
|
||||||
|
raise ValidationError({
|
||||||
|
'address': "Duplicate IP address found in global table: {}".format(dupes.first())
|
||||||
|
})
|
||||||
|
|
||||||
# Enforce unique IP space if applicable
|
# Enforce unique IP space if applicable
|
||||||
if self.vrf and self.vrf.enforce_unique:
|
if ((not self.vrf and settings.ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique)):
|
||||||
duplicate_ips = IPAddress.objects.filter(vrf=self.vrf, address__net_host=str(self.address.ip))\
|
dupes = IPAddress.objects.duplicates(self)
|
||||||
.exclude(pk=self.pk)
|
if dupes:
|
||||||
if duplicate_ips:
|
|
||||||
raise ValidationError({
|
raise ValidationError({
|
||||||
'address': "Duplicate IP address found in VRF {}: {}".format(self.vrf, duplicate_ips.first())
|
'address': "Duplicate IP address found in {}: {}".format(
|
||||||
})
|
"VRF {}".format(self.vrf) if self.vrf else "global table",
|
||||||
elif not self.vrf and settings.ENFORCE_GLOBAL_UNIQUE:
|
dupes.first(),
|
||||||
duplicate_ips = IPAddress.objects.filter(vrf=None, address__net_host=str(self.address.ip))\
|
)
|
||||||
.exclude(pk=self.pk)
|
|
||||||
if duplicate_ips:
|
|
||||||
raise ValidationError({
|
|
||||||
'address': "Duplicate IP address found in global table: {}".format(duplicate_ips.first())
|
|
||||||
})
|
})
|
||||||
|
|
||||||
def save(self, *args, **kwargs):
|
def save(self, *args, **kwargs):
|
||||||
|
Loading…
Reference in New Issue
Block a user