import logging from collections import deque from pathlib import Path
from django.contrib import admin, messages from django.contrib.sites.admin import SiteAdmin as DjangoSiteAdmin from django.contrib.sites.models import Site from django import forms from django.shortcuts import redirect, render, get_object_or_404 from django.urls import NoReverseMatch, path, reverse from django.utils.html import format_html
from django.template.response import TemplateResponse from django.http import FileResponse, JsonResponse from django.utils import timezone from django.db.models import Count from django.core.exceptions import FieldError from django.db.models.functions import TruncDate from datetime import datetime, time, timedelta import ipaddress from django.apps import apps as django_apps from django.conf import settings from django.utils.translation import gettext_lazy as _, ngettext from django.core.management import CommandError, call_command
from nodes.models import Node, NodeRole from nodes.utils import capture_screenshot, save_screenshot
from .forms import UserManualAdminForm from .module_defaults import reload_default_modules as restore_default_modules from .site_config import ensure_site_fields from .utils import landing_leads_supported
from .models import ( SiteBadge, Application, SiteProxy, Module, Landing, LandingLead, RoleLanding, Favorite, ViewHistory, UserManual, UserStory, ) from django.contrib.contenttypes.models import ContentType from core.models import ReleaseManager from core.user_data import EntityModelAdmin
logger = logging.getLogger(name)
def get_local_app_choices(): choices = [] for app_label in getattr(settings, "LOCAL_APPS", []): try: config = django_apps.get_app_config(app_label) except LookupError: continue choices.append((config.label, config.verbose_name)) return choices
class SiteBadgeInline(admin.StackedInline): model = SiteBadge can_delete = False extra = 0 fields = ("favicon", "landing_override")
class SiteForm(forms.ModelForm): name = forms.CharField(required=False)
class Meta:
model = Site
fields = "__all__"
ensure_site_fields()
class _BooleanAttributeListFilter(admin.SimpleListFilter):
"""Filter helper for boolean attributes on :class:~django.contrib.sites.models.Site."""
field_name: str
def lookups(self, request, model_admin): # pragma: no cover - admin UI
return (("1", _("Yes")), ("0", _("No")))
def queryset(self, request, queryset):
value = self.value()
if value not in {"0", "1"}:
return queryset
expected = value == "1"
try:
return queryset.filter(**{self.field_name: expected})
except FieldError: # pragma: no cover - defensive when fields missing
return queryset
class ManagedSiteListFilter(BooleanAttributeListFilter): title = ("Managed by local NGINX") parameter_name = "managed" field_name = "managed"
class RequireHttpsListFilter(BooleanAttributeListFilter): title = ("Require HTTPS") parameter_name = "require_https" field_name = "require_https"
class SiteAdmin(DjangoSiteAdmin): form = SiteForm inlines = [SiteBadgeInline] change_list_template = "admin/sites/site/change_list.html" fields = ("domain", "name", "managed", "require_https") list_display = ("domain", "name", "managed", "require_https") list_filter = (ManagedSiteListFilter, RequireHttpsListFilter) actions = ["capture_screenshot"]
@admin.action(description="Capture screenshot")
def capture_screenshot(self, request, queryset):
node = Node.get_local()
for site in queryset:
url = f"http://{site.domain}/"
try:
path = capture_screenshot(url)
screenshot = save_screenshot(path, node=node, method="ADMIN")
except Exception as exc: # pragma: no cover - browser issues
self.message_user(request, f"{site.domain}: {exc}", messages.ERROR)
continue
if screenshot:
link = reverse("admin:nodes_contentsample_change", args=[screenshot.pk])
self.message_user(
request,
format_html(
'Screenshot for {} saved. <a href="{}">View</a>',
site.domain,
link,
),
messages.SUCCESS,
)
else:
self.message_user(
request,
f"{site.domain}: duplicate screenshot; not saved",
messages.INFO,
)
def has_add_permission(self, request):
if super().has_add_permission(request):
return True
return request.user.has_perm("sites.add_site")
def has_change_permission(self, request, obj=None):
if super().has_change_permission(request, obj=obj):
return True
return request.user.has_perm("sites.change_site")
def has_delete_permission(self, request, obj=None):
if super().has_delete_permission(request, obj=obj):
return True
return request.user.has_perm("sites.delete_site")
def has_view_permission(self, request, obj=None):
if super().has_view_permission(request, obj=obj):
return True
return request.user.has_perm("sites.view_site") or request.user.has_perm(
"sites.change_site"
)
def has_module_permission(self, request):
if super().has_module_permission(request):
return True
return request.user.has_module_perms("sites")
def save_model(self, request, obj, form, change):
super().save_model(request, obj, form, change)
if {"managed", "require_https"} & set(form.changed_data or []):
self.message_user(
request,
_(
"Managed NGINX configuration staged. Run network-setup.sh to apply changes."
),
messages.INFO,
)
def delete_model(self, request, obj):
super().delete_model(request, obj)
self.message_user(
request,
_(
"Managed NGINX configuration staged. Run network-setup.sh to apply changes."
),
messages.INFO,
)
def _reload_site_fixtures(self, request):
fixtures_dir = Path(settings.BASE_DIR) / "core" / "fixtures"
fixture_paths = sorted(fixtures_dir.glob("references__00_site_*.json"))
sigil_fixture = fixtures_dir / "sigil_roots__site.json"
if sigil_fixture.exists():
fixture_paths.append(sigil_fixture)
if not fixture_paths:
self.message_user(request, _("No site fixtures found."), messages.WARNING)
return None
loaded = 0
for path in fixture_paths:
try:
call_command("loaddata", str(path), verbosity=0)
except CommandError as exc:
self.message_user(
request,
_("%(fixture)s: %(error)s")
% {"fixture": path.name, "error": exc},
messages.ERROR,
)
else:
loaded += 1
if loaded:
message = ngettext(
"Reloaded %(count)d site fixture.",
"Reloaded %(count)d site fixtures.",
loaded,
) % {"count": loaded}
self.message_user(request, message, messages.SUCCESS)
return None
def reload_site_fixtures(self, request):
if request.method != "POST":
return redirect("..")
self._reload_site_fixtures(request)
return redirect("..")
def get_urls(self):
urls = super().get_urls()
custom = [
path(
"register-current/",
self.admin_site.admin_view(self.register_current),
name="pages_siteproxy_register_current",
),
path(
"reload-site-fixtures/",
self.admin_site.admin_view(self.reload_site_fixtures),
name="pages_siteproxy_reload_site_fixtures",
),
]
return custom + urls
def register_current(self, request):
domain = request.get_host().split(":")[0]
try:
ipaddress.ip_address(domain)
except ValueError:
name = domain
else:
name = ""
site, created = Site.objects.get_or_create(
domain=domain, defaults={"name": name}
)
if created:
self.message_user(request, "Current domain registered", messages.SUCCESS)
else:
self.message_user(
request, "Current domain already registered", messages.INFO
)
return redirect("..")
admin.site.unregister(Site) admin.site.register(SiteProxy, SiteAdmin)
class ApplicationForm(forms.ModelForm): name = forms.ChoiceField(choices=[])
class Meta:
model = Application
fields = "__all__"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.fields["name"].choices = get_local_app_choices()
class ApplicationModuleInline(admin.TabularInline): model = Module fk_name = "application" extra = 0
@admin.register(Application) class ApplicationAdmin(EntityModelAdmin): form = ApplicationForm list_display = ("name", "app_verbose_name", "description", "installed") readonly_fields = ("installed",) inlines = [ApplicationModuleInline]
@admin.display(description="Verbose name")
def app_verbose_name(self, obj):
return obj.verbose_name
@admin.display(boolean=True)
def installed(self, obj):
return obj.installed
class LandingInline(admin.TabularInline): model = Landing extra = 0 fields = ("path", "label", "enabled", "track_leads") show_change_link = True
@admin.register(Landing) class LandingAdmin(EntityModelAdmin): list_display = ("label", "path", "module", "enabled", "track_leads") list_filter = ( "enabled", "track_leads", "module__node_role", "module__application", ) search_fields = ( "label", "path", "description", "module__path", "module__application__name", "module__node_role__name", ) fields = ("module", "path", "label", "enabled", "track_leads", "description") list_select_related = ("module", "module__application", "module__node_role")
@admin.register(Module) class ModuleAdmin(EntityModelAdmin): change_list_template = "admin/pages/module/change_list.html" list_display = ("application", "node_role", "path", "menu", "is_default") list_filter = ("node_role", "application") fields = ("node_role", "application", "path", "menu", "is_default", "favicon") inlines = [LandingInline]
def get_urls(self):
urls = super().get_urls()
custom = [
path(
"reload-default-modules/",
self.admin_site.admin_view(self.reload_default_modules_view),
name="pages_module_reload_default_modules",
),
]
return custom + urls
def reload_default_modules_view(self, request):
if request.method != "POST":
return redirect("..")
summary = restore_default_modules(Application, Module, Landing, NodeRole)
if summary.roles_processed == 0:
self.message_user(
request,
_("No default modules were reloaded because the required node roles are missing."),
messages.WARNING,
)
elif summary.has_changes:
parts: list[str] = []
if summary.modules_created:
parts.append(
ngettext(
"%(count)d module created",
"%(count)d modules created",
summary.modules_created,
)
% {"count": summary.modules_created}
)
if summary.modules_updated:
parts.append(
ngettext(
"%(count)d module updated",
"%(count)d modules updated",
summary.modules_updated,
)
% {"count": summary.modules_updated}
)
if summary.landings_created:
parts.append(
ngettext(
"%(count)d landing created",
"%(count)d landings created",
summary.landings_created,
)
% {"count": summary.landings_created}
)
if summary.landings_updated:
parts.append(
ngettext(
"%(count)d landing updated",
"%(count)d landings updated",
summary.landings_updated,
)
% {"count": summary.landings_updated}
)
details = "; ".join(parts)
if details:
message = _(
"Reloaded default modules for %(roles)d role(s). %(details)s."
) % {"roles": summary.roles_processed, "details": details}
else:
message = _(
"Reloaded default modules for %(roles)d role(s)."
) % {"roles": summary.roles_processed}
self.message_user(request, message, messages.SUCCESS)
else:
self.message_user(
request,
_(
"Default modules are already up to date for %(roles)d role(s)."
)
% {"roles": summary.roles_processed},
messages.INFO,
)
return redirect("..")
@admin.register(LandingLead) class LandingLeadAdmin(EntityModelAdmin): list_display = ( "landing_label", "landing_path", "status", "user", "referer_display", "created_on", ) list_filter = ( "status", "landing__module__node_role", "landing__module__application", ) search_fields = ( "landing__label", "landing__path", "referer", "path", "user__username", "user__email", ) readonly_fields = ( "landing", "user", "path", "referer", "user_agent", "ip_address", "created_on", ) fields = ( "landing", "user", "path", "referer", "user_agent", "ip_address", "status", "assign_to", "created_on", ) list_select_related = ("landing", "landing__module", "landing__module__application") ordering = ("-created_on",) date_hierarchy = "created_on"
def changelist_view(self, request, extra_context=None):
if not landing_leads_supported():
self.message_user(
request,
_(
"Landing leads are not being recorded because Celery is not running on this node."
),
messages.WARNING,
)
return super().changelist_view(request, extra_context=extra_context)
@admin.display(description=_("Landing"), ordering="landing__label")
def landing_label(self, obj):
return obj.landing.label
@admin.display(description=_("Path"), ordering="landing__path")
def landing_path(self, obj):
return obj.landing.path
@admin.display(description=_("Referrer"))
def referer_display(self, obj):
return obj.referer or ""
@admin.register(RoleLanding) class RoleLandingAdmin(EntityModelAdmin): list_display = ( "target_display", "landing_path", "landing_label", "priority", "is_seed_data", ) list_filter = ("node_role", "security_group") search_fields = ( "node_role__name", "security_group__name", "user__username", "landing__path", "landing__label", ) fields = ("node_role", "security_group", "user", "priority", "landing") list_select_related = ( "node_role", "security_group", "user", "landing", "landing__module", )
@admin.display(description="Landing Path")
def landing_path(self, obj):
return obj.landing.path if obj.landing_id else ""
@admin.display(description="Landing Label")
def landing_label(self, obj):
return obj.landing.label if obj.landing_id else ""
@admin.display(description="Target", ordering="priority")
def target_display(self, obj):
if obj.node_role_id:
return obj.node_role.name
if obj.security_group_id:
return obj.security_group.name
if obj.user_id:
return obj.user.get_username()
return ""
@admin.register(UserManual) class UserManualAdmin(EntityModelAdmin): form = UserManualAdminForm list_display = ("title", "slug", "languages", "is_seed_data", "is_user_data") search_fields = ("title", "slug", "description") list_filter = ("is_seed_data", "is_user_data")
@admin.register(ViewHistory) class ViewHistoryAdmin(EntityModelAdmin): date_hierarchy = "visited_at" list_display = ( "path", "status_code", "status_text", "method", "visited_at", ) list_filter = ("method", "status_code") search_fields = ("path", "error_message", "view_name", "status_text") readonly_fields = ( "path", "method", "status_code", "status_text", "error_message", "view_name", "visited_at", ) ordering = ("-visited_at",) change_list_template = "admin/pages/viewhistory/change_list.html" actions = ["view_traffic_graph"]
def has_add_permission(self, request):
return False
@admin.action(description="View traffic graph")
def view_traffic_graph(self, request, queryset):
return redirect("admin:pages_viewhistory_traffic_graph")
def get_urls(self):
urls = super().get_urls()
custom = [
path(
"traffic-graph/",
self.admin_site.admin_view(self.traffic_graph_view),
name="pages_viewhistory_traffic_graph",
),
path(
"traffic-data/",
self.admin_site.admin_view(self.traffic_data_view),
name="pages_viewhistory_traffic_data",
),
]
return custom + urls
def traffic_graph_view(self, request):
context = {
**self.admin_site.each_context(request),
"opts": self.model._meta,
"title": "Public site traffic",
"chart_endpoint": reverse("admin:pages_viewhistory_traffic_data"),
}
return TemplateResponse(
request,
"admin/pages/viewhistory/traffic_graph.html",
context,
)
def traffic_data_view(self, request):
return JsonResponse(
self._build_chart_data(days=self._resolve_requested_days(request))
)
def _resolve_requested_days(self, request, default: int = 30) -> int:
raw_value = request.GET.get("days")
if raw_value in (None, ""):
return default
try:
days = int(raw_value)
except (TypeError, ValueError):
return default
minimum = 1
maximum = 90
return max(minimum, min(days, maximum))
def _build_chart_data(self, days: int = 30, max_pages: int = 8) -> dict:
end_date = timezone.localdate()
start_date = end_date - timedelta(days=days - 1)
start_at = datetime.combine(start_date, time.min)
end_at = datetime.combine(end_date + timedelta(days=1), time.min)
if settings.USE_TZ:
current_tz = timezone.get_current_timezone()
start_at = timezone.make_aware(start_at, current_tz)
end_at = timezone.make_aware(end_at, current_tz)
trunc_expression = TruncDate("visited_at", tzinfo=current_tz)
else:
trunc_expression = TruncDate("visited_at")
queryset = ViewHistory.objects.filter(
visited_at__gte=start_at, visited_at__lt=end_at
)
meta = {
"start": start_date.isoformat(),
"end": end_date.isoformat(),
}
if not queryset.exists():
meta["pages"] = []
return {"labels": [], "datasets": [], "meta": meta}
top_paths = list(
queryset.values("path")
.annotate(total=Count("id"))
.order_by("-total")[:max_pages]
)
paths = [entry["path"] for entry in top_paths]
meta["pages"] = paths
labels = [
(start_date + timedelta(days=offset)).isoformat() for offset in range(days)
]
aggregates = (
queryset.filter(path__in=paths)
.annotate(day=trunc_expression)
.values("day", "path")
.order_by("day")
.annotate(total=Count("id"))
)
counts: dict[str, dict[str, int]] = {
path: {label: 0 for label in labels} for path in paths
}
for row in aggregates:
day = row["day"].isoformat()
path = row["path"]
if day in counts.get(path, {}):
counts[path][day] = row["total"]
palette = [
"#1f77b4",
"#ff7f0e",
"#2ca02c",
"#d62728",
"#9467bd",
"#8c564b",
"#e377c2",
"#7f7f7f",
"#bcbd22",
"#17becf",
]
datasets = []
for index, path in enumerate(paths):
color = palette[index % len(palette)]
datasets.append(
{
"label": path,
"data": [counts[path][label] for label in labels],
"borderColor": color,
"backgroundColor": color,
"fill": False,
"tension": 0.3,
}
)
return {"labels": labels, "datasets": datasets, "meta": meta}
@admin.register(UserStory) class UserStoryAdmin(EntityModelAdmin): date_hierarchy = "submitted_at" actions = ["create_github_issues"] list_display = ( "name", "language_code", "rating", "path", "status", "submitted_at", "github_issue_display", "screenshot_display", "take_screenshot", "owner", "assign_to", ) list_filter = ("rating", "status", "submitted_at", "take_screenshot") search_fields = ( "name", "comments", "path", "language_code", "referer", "github_issue_url", "ip_address", ) readonly_fields = ( "name", "rating", "comments", "take_screenshot", "path", "user", "owner", "language_code", "referer", "user_agent", "ip_address", "created_on", "submitted_at", "github_issue_number", "github_issue_url", "screenshot_display", ) ordering = ("-submitted_at",) fields = ( "name", "rating", "comments", "take_screenshot", "screenshot_display", "path", "language_code", "user", "owner", "status", "assign_to", "referer", "user_agent", "ip_address", "created_on", "submitted_at", "github_issue_number", "github_issue_url", )
@admin.display(description=_("GitHub issue"), ordering="github_issue_number")
def github_issue_display(self, obj):
if obj.github_issue_url:
label = (
f"#{obj.github_issue_number}"
if obj.github_issue_number is not None
else obj.github_issue_url
)
return format_html(
'<a href="{}" target="_blank" rel="noopener noreferrer">{}</a>',
obj.github_issue_url,
label,
)
if obj.github_issue_number is not None:
return f"#{obj.github_issue_number}"
return ""
@admin.display(description=_("Screenshot"), ordering="screenshot")
def screenshot_display(self, obj):
if not obj.screenshot_id:
return ""
try:
url = reverse("admin:nodes_contentsample_change", args=[obj.screenshot_id])
except NoReverseMatch:
return obj.screenshot.path
return format_html(
'<a href="{}" target="_blank" rel="noopener noreferrer">{}</a>',
url,
_("View screenshot"),
)
return _("Not created")
@admin.action(description=_("Create GitHub issues"))
def create_github_issues(self, request, queryset):
created = 0
skipped = 0
for story in queryset:
if story.github_issue_url:
skipped += 1
continue
try:
issue_url = story.create_github_issue()
except Exception as exc: # pragma: no cover - network/runtime errors
logger.exception("Failed to create GitHub issue for UserStory %s", story.pk)
message = _("Unable to create a GitHub issue for %(story)s: %(error)s") % {
"story": story,
"error": exc,
}
if (
isinstance(exc, RuntimeError)
and "GitHub token is not configured" in str(exc)
):
try:
opts = ReleaseManager._meta
config_url = reverse(
f"{self.admin_site.name}:{opts.app_label}_{opts.model_name}_changelist"
)
except NoReverseMatch: # pragma: no cover - defensive guard
config_url = None
if config_url:
message = format_html(
"{} <a href=\"{}\">{}</a>",
message,
config_url,
_("Configure GitHub credentials."),
)
self.message_user(
request,
message,
messages.ERROR,
)
continue
if issue_url:
created += 1
else:
skipped += 1
if created:
self.message_user(
request,
ngettext(
"Created %(count)d GitHub issue.",
"Created %(count)d GitHub issues.",
created,
)
% {"count": created},
messages.SUCCESS,
)
if skipped:
self.message_user(
request,
ngettext(
"Skipped %(count)d feedback item (issue already exists or was throttled).",
"Skipped %(count)d feedback items (issues already exist or were throttled).",
skipped,
)
% {"count": skipped},
messages.INFO,
)
def has_add_permission(self, request):
return False
def favorite_toggle(request, ct_id): ct = get_object_or_404(ContentType, pk=ct_id) fav = Favorite.objects.filter(user=request.user, content_type=ct).first() next_url = request.GET.get("next") if request.method == "POST": if fav and request.POST.get("remove"): fav.delete() return redirect(next_url or "admin:index") label = request.POST.get("custom_label", "").strip() user_data = request.POST.get("user_data") == "on" priority_raw = request.POST.get("priority", "").strip() if fav: default_priority = fav.priority else: default_priority = 0 if priority_raw: try: priority = int(priority_raw) except (TypeError, ValueError): priority = default_priority else: priority = default_priority
if fav:
update_fields = []
if fav.custom_label != label:
fav.custom_label = label
update_fields.append("custom_label")
if fav.user_data != user_data:
fav.user_data = user_data
update_fields.append("user_data")
if fav.priority != priority:
fav.priority = priority
update_fields.append("priority")
if update_fields:
fav.save(update_fields=update_fields)
else:
Favorite.objects.create(
user=request.user,
content_type=ct,
custom_label=label,
user_data=user_data,
priority=priority,
)
return redirect(next_url or "admin:index")
return render(
request,
"admin/favorite_confirm.html",
{
"content_type": ct,
"favorite": fav,
"next": next_url,
"initial_label": fav.custom_label if fav else "",
"initial_priority": fav.priority if fav else 0,
"is_checked": fav.user_data if fav else True,
},
)
def favorite_list(request): favorites = ( Favorite.objects.filter(user=request.user) .select_related("content_type") .order_by("priority", "pk") ) if request.method == "POST": selected = set(request.POST.getlist("user_data")) for fav in favorites: update_fields = [] user_selected = str(fav.pk) in selected if fav.user_data != user_selected: fav.user_data = user_selected update_fields.append("user_data")
priority_raw = request.POST.get(f"priority_{fav.pk}", "").strip()
if priority_raw:
try:
priority = int(priority_raw)
except (TypeError, ValueError):
priority = fav.priority
else:
if fav.priority != priority:
fav.priority = priority
update_fields.append("priority")
else:
if fav.priority != 0:
fav.priority = 0
update_fields.append("priority")
if update_fields:
fav.save(update_fields=update_fields)
return redirect("admin:favorite_list")
return render(request, "admin/favorite_list.html", {"favorites": favorites})
def favorite_delete(request, pk): fav = get_object_or_404(Favorite, pk=pk, user=request.user) fav.delete() return redirect("admin:favorite_list")
def favorite_clear(request): Favorite.objects.filter(user=request.user).delete() return redirect("admin:favorite_list")
def _read_log_tail(path: Path, limit: int) -> str:
"""Return the last limit lines from path preserving newlines."""
with path.open("r", encoding="utf-8") as handle:
return "".join(deque(handle, maxlen=limit))
def log_viewer(request): logs_dir = Path(settings.BASE_DIR) / "logs" logs_exist = logs_dir.exists() and logs_dir.is_dir() available_logs = [] if logs_exist: available_logs = sorted( [ entry.name for entry in logs_dir.iterdir() if entry.is_file() and not entry.name.startswith(".") ], key=str.lower, )
selected_log = request.GET.get("log", "")
log_content = ""
log_error = ""
limit_options = [
{"value": "20", "label": "20"},
{"value": "40", "label": "40"},
{"value": "100", "label": "100"},
{"value": "all", "label": _("All")},
]
allowed_limits = [item["value"] for item in limit_options]
limit_choice = request.GET.get("limit", "20")
if limit_choice not in allowed_limits:
limit_choice = "20"
limit_index = allowed_limits.index(limit_choice)
download_requested = request.GET.get("download") == "1"
if selected_log:
if selected_log in available_logs:
selected_path = logs_dir / selected_log
try:
if download_requested:
return FileResponse(
selected_path.open("rb"),
as_attachment=True,
filename=selected_log,
)
if limit_choice == "all":
try:
log_content = selected_path.read_text(encoding="utf-8")
except UnicodeDecodeError:
log_content = selected_path.read_text(
encoding="utf-8", errors="replace"
)
else:
try:
limit_value = int(limit_choice)
except (TypeError, ValueError):
limit_value = 20
limit_choice = "20"
limit_index = allowed_limits.index(limit_choice)
try:
log_content = _read_log_tail(selected_path, limit_value)
except UnicodeDecodeError:
with selected_path.open(
"r", encoding="utf-8", errors="replace"
) as handle:
log_content = "".join(deque(handle, maxlen=limit_value))
except OSError as exc: # pragma: no cover - filesystem edge cases
logger.warning("Unable to read log file %s", selected_path, exc_info=exc)
log_error = _(
"The log file could not be read. Check server permissions and try again."
)
else:
log_error = _("The requested log could not be found.")
if not logs_exist:
log_notice = _("The logs directory could not be found at %(path)s.") % {
"path": logs_dir,
}
elif not available_logs:
log_notice = _("No log files were found in %(path)s.") % {"path": logs_dir}
else:
log_notice = ""
limit_label = limit_options[limit_index]["label"]
context = {**admin.site.each_context(request)}
context.update(
{
"title": _("Log viewer"),
"available_logs": available_logs,
"selected_log": selected_log,
"log_content": log_content,
"log_error": log_error,
"log_notice": log_notice,
"logs_directory": logs_dir,
"log_limit_options": limit_options,
"log_limit_index": limit_index,
"log_limit_choice": limit_choice,
"log_limit_label": limit_label,
}
)
return TemplateResponse(request, "admin/log_viewer.html", context)
def get_admin_urls(original_get_urls):
def get_urls():
urls = original_get_urls()
my_urls = [
path(
"logs/viewer/",
admin.site.admin_view(log_viewer),
name="log_viewer",
),
path(
"favorites/
return get_urls
admin.site.get_urls = get_admin_urls(admin.site.get_urls)