mirror of
https://github.com/netbox-community/netbox.git
synced 2025-07-16 12:12:53 -06:00
670 lines
22 KiB
Python
670 lines
22 KiB
Python
import json
|
|
import platform
|
|
|
|
from django import __version__ as DJANGO_VERSION
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth.mixins import UserPassesTestMixin
|
|
from django.core.cache import cache
|
|
from django.db import connection, ProgrammingError
|
|
from django.http import HttpResponse, HttpResponseForbidden, Http404
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.urls import reverse
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django.views.generic import View
|
|
from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
|
|
from django_rq.settings import QUEUES_MAP, QUEUES_LIST
|
|
from django_rq.utils import get_statistics
|
|
from rq.exceptions import NoSuchJobError
|
|
from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
|
|
from rq.worker import Worker
|
|
from rq.worker_registration import clean_worker_registry
|
|
|
|
from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
|
|
from netbox.config import get_config, PARAMS
|
|
from netbox.object_actions import AddObject, BulkDelete, BulkExport, DeleteObject
|
|
from netbox.registry import registry
|
|
from netbox.views import generic
|
|
from netbox.views.generic.base import BaseObjectView
|
|
from netbox.views.generic.mixins import TableMixin
|
|
from utilities.data import shallow_compare_dict
|
|
from utilities.forms import ConfirmationForm
|
|
from utilities.htmx import htmx_partial
|
|
from utilities.json import ConfigJSONEncoder
|
|
from utilities.query import count_related
|
|
from utilities.views import ContentTypePermissionRequiredMixin, GetRelatedModelsMixin, ViewTab, register_model_view
|
|
from . import filtersets, forms, tables
|
|
from .choices import DataSourceStatusChoices
|
|
from .jobs import SyncDataSourceJob
|
|
from .models import *
|
|
from .plugins import get_catalog_plugins, get_local_plugins
|
|
from .tables import CatalogPluginTable, JobLogEntryTable, PluginVersionTable
|
|
|
|
|
|
#
|
|
# Data sources
|
|
#
|
|
|
|
@register_model_view(DataSource, 'list', path='', detail=False)
|
|
class DataSourceListView(generic.ObjectListView):
|
|
queryset = DataSource.objects.annotate(
|
|
file_count=count_related(DataFile, 'source')
|
|
)
|
|
filterset = filtersets.DataSourceFilterSet
|
|
filterset_form = forms.DataSourceFilterForm
|
|
table = tables.DataSourceTable
|
|
|
|
|
|
@register_model_view(DataSource)
|
|
class DataSourceView(GetRelatedModelsMixin, generic.ObjectView):
|
|
queryset = DataSource.objects.all()
|
|
|
|
def get_extra_context(self, request, instance):
|
|
return {
|
|
'related_models': self.get_related_models(request, instance),
|
|
}
|
|
|
|
|
|
@register_model_view(DataSource, 'sync')
|
|
class DataSourceSyncView(BaseObjectView):
|
|
queryset = DataSource.objects.all()
|
|
|
|
def get_required_permission(self):
|
|
return 'core.sync_datasource'
|
|
|
|
def get(self, request, pk):
|
|
# Redirect GET requests to the object view
|
|
datasource = get_object_or_404(self.queryset, pk=pk)
|
|
return redirect(datasource.get_absolute_url())
|
|
|
|
def post(self, request, pk):
|
|
datasource = get_object_or_404(self.queryset, pk=pk)
|
|
|
|
# Enqueue the sync job & update the DataSource's status
|
|
job = SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
|
|
datasource.status = DataSourceStatusChoices.QUEUED
|
|
DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)
|
|
|
|
messages.success(
|
|
request,
|
|
_("Queued job #{id} to sync {datasource}").format(id=job.pk, datasource=datasource)
|
|
)
|
|
return redirect(datasource.get_absolute_url())
|
|
|
|
|
|
@register_model_view(DataSource, 'add', detail=False)
|
|
@register_model_view(DataSource, 'edit')
|
|
class DataSourceEditView(generic.ObjectEditView):
|
|
queryset = DataSource.objects.all()
|
|
form = forms.DataSourceForm
|
|
|
|
|
|
@register_model_view(DataSource, 'delete')
|
|
class DataSourceDeleteView(generic.ObjectDeleteView):
|
|
queryset = DataSource.objects.all()
|
|
|
|
|
|
@register_model_view(DataSource, 'bulk_import', path='import', detail=False)
|
|
class DataSourceBulkImportView(generic.BulkImportView):
|
|
queryset = DataSource.objects.all()
|
|
model_form = forms.DataSourceImportForm
|
|
|
|
|
|
@register_model_view(DataSource, 'bulk_edit', path='edit', detail=False)
|
|
class DataSourceBulkEditView(generic.BulkEditView):
|
|
queryset = DataSource.objects.annotate(
|
|
count_files=count_related(DataFile, 'source')
|
|
)
|
|
filterset = filtersets.DataSourceFilterSet
|
|
table = tables.DataSourceTable
|
|
form = forms.DataSourceBulkEditForm
|
|
|
|
|
|
@register_model_view(DataSource, 'bulk_rename', path='rename', detail=False)
|
|
class DataSourceBulkRenameView(generic.BulkRenameView):
|
|
queryset = DataSource.objects.all()
|
|
|
|
|
|
@register_model_view(DataSource, 'bulk_delete', path='delete', detail=False)
|
|
class DataSourceBulkDeleteView(generic.BulkDeleteView):
|
|
queryset = DataSource.objects.annotate(
|
|
count_files=count_related(DataFile, 'source')
|
|
)
|
|
filterset = filtersets.DataSourceFilterSet
|
|
table = tables.DataSourceTable
|
|
|
|
|
|
#
|
|
# Data files
|
|
#
|
|
|
|
@register_model_view(DataFile, 'list', path='', detail=False)
|
|
class DataFileListView(generic.ObjectListView):
|
|
queryset = DataFile.objects.defer('data')
|
|
filterset = filtersets.DataFileFilterSet
|
|
filterset_form = forms.DataFileFilterForm
|
|
table = tables.DataFileTable
|
|
actions = (BulkDelete,)
|
|
|
|
|
|
@register_model_view(DataFile)
|
|
class DataFileView(generic.ObjectView):
|
|
queryset = DataFile.objects.all()
|
|
actions = (DeleteObject,)
|
|
|
|
|
|
@register_model_view(DataFile, 'delete')
|
|
class DataFileDeleteView(generic.ObjectDeleteView):
|
|
queryset = DataFile.objects.all()
|
|
|
|
|
|
@register_model_view(DataFile, 'bulk_delete', path='delete', detail=False)
|
|
class DataFileBulkDeleteView(generic.BulkDeleteView):
|
|
queryset = DataFile.objects.defer('data')
|
|
filterset = filtersets.DataFileFilterSet
|
|
table = tables.DataFileTable
|
|
|
|
|
|
#
|
|
# Jobs
|
|
#
|
|
|
|
@register_model_view(Job, 'list', path='', detail=False)
|
|
class JobListView(generic.ObjectListView):
|
|
queryset = Job.objects.defer('data')
|
|
filterset = filtersets.JobFilterSet
|
|
filterset_form = forms.JobFilterForm
|
|
table = tables.JobTable
|
|
actions = (BulkExport, BulkDelete)
|
|
|
|
|
|
@register_model_view(Job)
|
|
class JobView(generic.ObjectView):
|
|
queryset = Job.objects.all()
|
|
actions = (DeleteObject,)
|
|
|
|
|
|
@register_model_view(Job, 'log')
|
|
class JobLogView(generic.ObjectView):
|
|
queryset = Job.objects.all()
|
|
actions = (DeleteObject,)
|
|
template_name = 'core/job/log.html'
|
|
tab = ViewTab(
|
|
label=_('Log'),
|
|
badge=lambda obj: len(obj.log_entries),
|
|
weight=500,
|
|
)
|
|
|
|
def get_extra_context(self, request, instance):
|
|
table = JobLogEntryTable(instance.log_entries)
|
|
table.configure(request)
|
|
return {
|
|
'table': table,
|
|
}
|
|
|
|
|
|
@register_model_view(Job, 'delete')
|
|
class JobDeleteView(generic.ObjectDeleteView):
|
|
queryset = Job.objects.defer('data')
|
|
|
|
|
|
@register_model_view(Job, 'bulk_delete', path='delete', detail=False)
|
|
class JobBulkDeleteView(generic.BulkDeleteView):
|
|
queryset = Job.objects.defer('data')
|
|
filterset = filtersets.JobFilterSet
|
|
table = tables.JobTable
|
|
|
|
|
|
#
|
|
# Change logging
|
|
#
|
|
|
|
@register_model_view(ObjectChange, 'list', path='', detail=False)
|
|
class ObjectChangeListView(generic.ObjectListView):
|
|
queryset = ObjectChange.objects.valid_models()
|
|
filterset = filtersets.ObjectChangeFilterSet
|
|
filterset_form = forms.ObjectChangeFilterForm
|
|
table = tables.ObjectChangeTable
|
|
template_name = 'core/objectchange_list.html'
|
|
actions = (BulkExport,)
|
|
|
|
|
|
@register_model_view(ObjectChange)
|
|
class ObjectChangeView(generic.ObjectView):
|
|
queryset = ObjectChange.objects.valid_models()
|
|
|
|
def get_extra_context(self, request, instance):
|
|
related_changes = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
|
|
request_id=instance.request_id
|
|
).exclude(
|
|
pk=instance.pk
|
|
)
|
|
related_changes_table = tables.ObjectChangeTable(
|
|
data=related_changes[:50],
|
|
orderable=False
|
|
)
|
|
related_changes_table.configure(request)
|
|
|
|
objectchanges = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
|
|
changed_object_type=instance.changed_object_type,
|
|
changed_object_id=instance.changed_object_id,
|
|
)
|
|
|
|
next_change = objectchanges.filter(time__gt=instance.time).order_by('time').first()
|
|
prev_change = objectchanges.filter(time__lt=instance.time).order_by('-time').first()
|
|
|
|
if not instance.prechange_data and instance.action in ['update', 'delete'] and prev_change:
|
|
non_atomic_change = True
|
|
prechange_data = prev_change.postchange_data_clean
|
|
else:
|
|
non_atomic_change = False
|
|
prechange_data = instance.prechange_data_clean
|
|
|
|
if prechange_data and instance.postchange_data:
|
|
diff_added = shallow_compare_dict(
|
|
prechange_data or dict(),
|
|
instance.postchange_data_clean or dict(),
|
|
exclude=['last_updated'],
|
|
)
|
|
diff_removed = {
|
|
x: prechange_data.get(x) for x in diff_added
|
|
} if prechange_data else {}
|
|
else:
|
|
diff_added = None
|
|
diff_removed = None
|
|
|
|
return {
|
|
'diff_added': diff_added,
|
|
'diff_removed': diff_removed,
|
|
'next_change': next_change,
|
|
'prev_change': prev_change,
|
|
'related_changes_table': related_changes_table,
|
|
'related_changes_count': related_changes.count(),
|
|
'non_atomic_change': non_atomic_change
|
|
}
|
|
|
|
|
|
#
|
|
# Config Revisions
|
|
#
|
|
|
|
@register_model_view(ConfigRevision, 'list', path='', detail=False)
|
|
class ConfigRevisionListView(generic.ObjectListView):
|
|
queryset = ConfigRevision.objects.all()
|
|
filterset = filtersets.ConfigRevisionFilterSet
|
|
filterset_form = forms.ConfigRevisionFilterForm
|
|
table = tables.ConfigRevisionTable
|
|
actions = (AddObject, BulkExport)
|
|
|
|
|
|
@register_model_view(ConfigRevision)
|
|
class ConfigRevisionView(generic.ObjectView):
|
|
queryset = ConfigRevision.objects.all()
|
|
|
|
|
|
@register_model_view(ConfigRevision, 'add', detail=False)
|
|
class ConfigRevisionEditView(generic.ObjectEditView):
|
|
queryset = ConfigRevision.objects.all()
|
|
form = forms.ConfigRevisionForm
|
|
|
|
|
|
@register_model_view(ConfigRevision, 'delete')
|
|
class ConfigRevisionDeleteView(generic.ObjectDeleteView):
|
|
queryset = ConfigRevision.objects.all()
|
|
|
|
|
|
@register_model_view(ConfigRevision, 'bulk_delete', path='delete', detail=False)
|
|
class ConfigRevisionBulkDeleteView(generic.BulkDeleteView):
|
|
queryset = ConfigRevision.objects.all()
|
|
filterset = filtersets.ConfigRevisionFilterSet
|
|
table = tables.ConfigRevisionTable
|
|
|
|
|
|
@register_model_view(ConfigRevision, 'restore')
|
|
class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
|
|
|
|
def get_required_permission(self):
|
|
return 'core.configrevision_edit'
|
|
|
|
def get(self, request, pk):
|
|
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
|
|
|
|
# Get the current ConfigRevision
|
|
config_version = get_config().version
|
|
current_config = ConfigRevision.objects.filter(pk=config_version).first()
|
|
|
|
params = []
|
|
for param in PARAMS:
|
|
params.append((
|
|
param.name,
|
|
current_config.data.get(param.name, None) if current_config else None,
|
|
candidate_config.data.get(param.name, None)
|
|
))
|
|
|
|
return render(request, 'core/configrevision_restore.html', {
|
|
'object': candidate_config,
|
|
'params': params,
|
|
})
|
|
|
|
def post(self, request, pk):
|
|
if not request.user.has_perm('core.configrevision_edit'):
|
|
return HttpResponseForbidden()
|
|
|
|
candidate_config = get_object_or_404(ConfigRevision, pk=pk)
|
|
candidate_config.activate()
|
|
messages.success(request, _("Restored configuration revision #{id}").format(id=pk))
|
|
|
|
return redirect(candidate_config.get_absolute_url())
|
|
|
|
|
|
#
|
|
# Background Tasks (RQ)
|
|
#
|
|
|
|
class BaseRQView(UserPassesTestMixin, View):
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
|
|
class BackgroundQueueListView(TableMixin, BaseRQView):
|
|
table = tables.BackgroundQueueTable
|
|
|
|
def get(self, request):
|
|
data = get_statistics(run_maintenance_tasks=True)["queues"]
|
|
table = self.get_table(data, request, bulk_actions=False)
|
|
|
|
return render(request, 'core/rq_queue_list.html', {
|
|
'table': table,
|
|
})
|
|
|
|
|
|
class BackgroundTaskListView(TableMixin, BaseRQView):
|
|
table = tables.BackgroundTaskTable
|
|
|
|
def get_table_data(self, request, queue, status):
|
|
|
|
# Call get_jobs() to returned queued tasks
|
|
if status == RQJobStatus.QUEUED:
|
|
return queue.get_jobs()
|
|
|
|
return get_rq_jobs_from_status(queue, status)
|
|
|
|
def get(self, request, queue_index, status):
|
|
queue = get_queue_by_index(queue_index)
|
|
data = self.get_table_data(request, queue, status)
|
|
table = self.get_table(data, request, False)
|
|
|
|
# If this is an HTMX request, return only the rendered table HTML
|
|
if htmx_partial(request):
|
|
return render(request, 'htmx/table.html', {
|
|
'table': table,
|
|
})
|
|
|
|
return render(request, 'core/rq_task_list.html', {
|
|
'table': table,
|
|
'queue': queue,
|
|
'status': status,
|
|
})
|
|
|
|
|
|
class BackgroundTaskView(BaseRQView):
|
|
|
|
def get(self, request, job_id):
|
|
# all the RQ queues should use the same connection
|
|
config = QUEUES_LIST[0]
|
|
try:
|
|
job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
except NoSuchJobError:
|
|
raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
|
queue_index = QUEUES_MAP[job.origin]
|
|
queue = get_queue_by_index(queue_index)
|
|
|
|
try:
|
|
exc_info = job._exc_info
|
|
except AttributeError:
|
|
exc_info = None
|
|
|
|
return render(request, 'core/rq_task.html', {
|
|
'queue': queue,
|
|
'job': job,
|
|
'queue_index': queue_index,
|
|
'dependency_id': job._dependency_id,
|
|
'exc_info': exc_info,
|
|
})
|
|
|
|
|
|
class BackgroundTaskDeleteView(BaseRQView):
|
|
|
|
def get(self, request, job_id):
|
|
if not request.htmx:
|
|
return redirect(reverse('core:background_queue_list'))
|
|
|
|
form = ConfirmationForm(initial=request.GET)
|
|
|
|
return render(request, 'htmx/delete_form.html', {
|
|
'object_type': 'background task',
|
|
'object': job_id,
|
|
'form': form,
|
|
'form_url': reverse('core:background_task_delete', kwargs={'job_id': job_id})
|
|
})
|
|
|
|
def post(self, request, job_id):
|
|
form = ConfirmationForm(request.POST)
|
|
|
|
if form.is_valid():
|
|
delete_rq_job(job_id)
|
|
messages.success(request, _('Job {id} has been deleted.').format(id=job_id))
|
|
else:
|
|
messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0]))
|
|
|
|
return redirect(reverse('core:background_queue_list'))
|
|
|
|
|
|
class BackgroundTaskRequeueView(BaseRQView):
|
|
|
|
def get(self, request, job_id):
|
|
requeue_rq_job(job_id)
|
|
messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id))
|
|
return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
|
|
|
class BackgroundTaskEnqueueView(BaseRQView):
|
|
|
|
def get(self, request, job_id):
|
|
# all the RQ queues should use the same connection
|
|
enqueue_rq_job(job_id)
|
|
messages.success(request, _('Job {id} has been enqueued.').format(id=job_id))
|
|
return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
|
|
|
class BackgroundTaskStopView(BaseRQView):
|
|
|
|
def get(self, request, job_id):
|
|
stopped_jobs = stop_rq_job(job_id)
|
|
if len(stopped_jobs) == 1:
|
|
messages.success(request, _('Job {id} has been stopped.').format(id=job_id))
|
|
else:
|
|
messages.error(request, _('Failed to stop job {id}').format(id=job_id))
|
|
|
|
return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
|
|
|
class WorkerListView(TableMixin, BaseRQView):
|
|
table = tables.WorkerTable
|
|
|
|
def get_table_data(self, request, queue):
|
|
clean_worker_registry(queue)
|
|
all_workers = Worker.all(queue.connection)
|
|
workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
|
|
return workers
|
|
|
|
def get(self, request, queue_index):
|
|
queue = get_queue_by_index(queue_index)
|
|
data = self.get_table_data(request, queue)
|
|
|
|
table = self.get_table(data, request, False)
|
|
|
|
# If this is an HTMX request, return only the rendered table HTML
|
|
if htmx_partial(request):
|
|
if not request.htmx.target:
|
|
table.embedded = True
|
|
# Hide selection checkboxes
|
|
if 'pk' in table.base_columns:
|
|
table.columns.hide('pk')
|
|
return render(request, 'htmx/table.html', {
|
|
'table': table,
|
|
'queue': queue,
|
|
})
|
|
|
|
return render(request, 'core/rq_worker_list.html', {
|
|
'table': table,
|
|
'queue': queue,
|
|
})
|
|
|
|
|
|
class WorkerView(BaseRQView):
|
|
|
|
def get(self, request, key):
|
|
# all the RQ queues should use the same connection
|
|
config = QUEUES_LIST[0]
|
|
worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
|
|
# Convert microseconds to milliseconds
|
|
worker.total_working_time = worker.total_working_time / 1000
|
|
|
|
return render(request, 'core/rq_worker.html', {
|
|
'worker': worker,
|
|
'job': worker.get_current_job(),
|
|
'total_working_time': worker.total_working_time * 1000,
|
|
})
|
|
|
|
|
|
#
|
|
# System
|
|
#
|
|
|
|
class SystemView(UserPassesTestMixin, View):
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def get(self, request):
|
|
|
|
# System stats
|
|
psql_version = db_name = db_size = None
|
|
try:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute("SELECT version()")
|
|
psql_version = cursor.fetchone()[0]
|
|
psql_version = psql_version.split('(')[0].strip()
|
|
cursor.execute("SELECT current_database()")
|
|
db_name = cursor.fetchone()[0]
|
|
cursor.execute(f"SELECT pg_size_pretty(pg_database_size('{db_name}'))")
|
|
db_size = cursor.fetchone()[0]
|
|
except (ProgrammingError, IndexError):
|
|
pass
|
|
stats = {
|
|
'netbox_release': settings.RELEASE,
|
|
'django_version': DJANGO_VERSION,
|
|
'python_version': platform.python_version(),
|
|
'postgresql_version': psql_version,
|
|
'database_name': db_name,
|
|
'database_size': db_size,
|
|
'rq_worker_count': Worker.count(get_connection('default')),
|
|
}
|
|
|
|
# Configuration
|
|
config = get_config()
|
|
|
|
# Raw data export
|
|
if 'export' in request.GET:
|
|
stats['netbox_release'] = stats['netbox_release'].asdict()
|
|
params = [param.name for param in PARAMS]
|
|
data = {
|
|
**stats,
|
|
'plugins': registry['plugins']['installed'],
|
|
'config': {
|
|
k: getattr(config, k) for k in sorted(params)
|
|
},
|
|
}
|
|
response = HttpResponse(json.dumps(data, cls=ConfigJSONEncoder, indent=4), content_type='text/json')
|
|
response['Content-Disposition'] = 'attachment; filename="netbox.json"'
|
|
return response
|
|
|
|
# Serialize any CustomValidator classes
|
|
for attr in ['CUSTOM_VALIDATORS', 'PROTECTION_RULES']:
|
|
if hasattr(config, attr) and getattr(config, attr, None):
|
|
setattr(config, attr, json.dumps(getattr(config, attr), cls=ConfigJSONEncoder, indent=4))
|
|
|
|
return render(request, 'core/system.html', {
|
|
'stats': stats,
|
|
'config': config,
|
|
})
|
|
|
|
|
|
#
|
|
# Plugins
|
|
#
|
|
|
|
class BasePluginView(UserPassesTestMixin, View):
|
|
CACHE_KEY_CATALOG_ERROR = 'plugins-catalog-error'
|
|
|
|
def test_func(self):
|
|
return self.request.user.is_staff
|
|
|
|
def get_cached_plugins(self, request):
|
|
catalog_plugins = {}
|
|
catalog_plugins_error = cache.get(self.CACHE_KEY_CATALOG_ERROR, default=False)
|
|
if not catalog_plugins_error:
|
|
catalog_plugins = get_catalog_plugins()
|
|
if not catalog_plugins and not settings.ISOLATED_DEPLOYMENT:
|
|
# Cache for 5 minutes to avoid spamming connection
|
|
cache.set(self.CACHE_KEY_CATALOG_ERROR, True, 300)
|
|
messages.warning(request, _("Plugins catalog could not be loaded"))
|
|
|
|
return get_local_plugins(catalog_plugins)
|
|
|
|
|
|
class PluginListView(BasePluginView):
|
|
|
|
def get(self, request):
|
|
q = request.GET.get('q', None)
|
|
|
|
plugins = self.get_cached_plugins(request).values()
|
|
if q:
|
|
plugins = [obj for obj in plugins if q.casefold() in obj.title_short.casefold()]
|
|
|
|
plugins = [plugin for plugin in plugins if not plugin.hidden]
|
|
|
|
table = CatalogPluginTable(plugins, user=request.user)
|
|
table.configure(request)
|
|
|
|
# If this is an HTMX request, return only the rendered table HTML
|
|
if htmx_partial(request):
|
|
return render(request, 'htmx/table.html', {
|
|
'table': table,
|
|
})
|
|
|
|
return render(request, 'core/plugin_list.html', {
|
|
'table': table,
|
|
})
|
|
|
|
|
|
class PluginView(BasePluginView):
|
|
|
|
def get(self, request, name):
|
|
|
|
plugins = self.get_cached_plugins(request)
|
|
if name not in plugins:
|
|
raise Http404(_("Plugin {name} not found").format(name=name))
|
|
plugin = plugins[name]
|
|
|
|
table = PluginVersionTable(plugin.release_recent_history, user=request.user)
|
|
table.configure(request)
|
|
|
|
return render(request, 'core/plugin.html', {
|
|
'plugin': plugin,
|
|
'table': table,
|
|
})
|