from future import annotations
from functools import lru_cache from pathlib import Path from io import BytesIO from zipfile import ZipFile import json
from django.apps import apps from django.conf import settings from django.contrib import admin, messages from django.contrib.auth import get_user_model from django.contrib.auth.signals import user_logged_in from django.core.management import call_command from django.db.models.signals import post_save from django.dispatch import receiver from django.http import HttpResponse, HttpResponseRedirect from django.template.response import TemplateResponse from django.urls import path, reverse from django.utils.functional import LazyObject from django.utils.translation import gettext as _
from .entity import Entity
def _data_root(user=None) -> Path: path = Path(getattr(user, "data_path", "") or Path(settings.BASE_DIR) / "data") path.mkdir(parents=True, exist_ok=True) return path
def _username_for(user) -> str: username = "" if hasattr(user, "get_username"): username = user.get_username() if not username and hasattr(user, "username"): username = user.username if not username and getattr(user, "pk", None): username = str(user.pk) return username
def _user_allows_user_data(user) -> bool: if not user: return False username = _username_for(user) UserModel = get_user_model() system_username = getattr(UserModel, "SYSTEM_USERNAME", "") if system_username and username == system_username: return True return not getattr(user, "is_profile_restricted", False)
def _data_dir(user) -> Path: username = _username_for(user) if not username: raise ValueError("Cannot determine username for fixture directory") path = _data_root(user) / username path.mkdir(parents=True, exist_ok=True) return path
def fixture_path(user, instance) -> Path: model_meta = instance._meta.concrete_model._meta filename = f"{model_meta.app_label}{model_meta.model_name}_{instance.pk}.json" return _data_dir(user) / filename
_SEED_FIXTURE_IGNORED_FIELDS = {"is_seed_data", "is_deleted", "is_user_data"}
@lru_cache(maxsize=1) def _seed_fixture_index(): base = Path(settings.BASE_DIR) index: dict[str, dict[str, object]] = {} for path in base.glob("*/fixtures/.json"): try: data = json.loads(path.read_text(encoding="utf-8")) except Exception: continue if not isinstance(data, list) or not data: continue obj = data[0] if not isinstance(obj, dict): continue label = obj.get("model") if not isinstance(label, str): continue fields = obj.get("fields") or {} if not isinstance(fields, dict): fields = {} comparable_fields = { key: value for key, value in fields.items() if key not in _SEED_FIXTURE_IGNORED_FIELDS } pk = obj.get("pk") entries = index.setdefault(label, {"pk": {}, "fields": []}) pk_index = entries.setdefault("pk", {}) field_index = entries.setdefault("fields", []) if pk is not None: pk_index[pk] = path if comparable_fields: field_index.append((comparable_fields, path)) return index
def _seed_fixture_path(instance, *, index=None) -> Path | None: label = f"{instance._meta.app_label}.{instance._meta.model_name}" fixture_index = index or _seed_fixture_index() entries = fixture_index.get(label) if not entries: return None pk = getattr(instance, "pk", None) pk_index = entries.get("pk", {}) if pk is not None: path = pk_index.get(pk) if path is not None: return path for comparable_fields, path in entries.get("fields", []): match = True if not isinstance(comparable_fields, dict): continue for field_name, value in comparable_fields.items(): if not hasattr(instance, field_name): match = False break if getattr(instance, field_name) != value: match = False break if match: return path return None
def _coerce_user(candidate, user_model): if candidate is None: return None if isinstance(candidate, user_model): return candidate if isinstance(candidate, LazyObject): try: candidate._setup() except Exception: return None return _coerce_user(candidate._wrapped, user_model) return None
def _select_fixture_user(candidate, user_model): user = _coerce_user(candidate, user_model) visited: set[int] = set() while user is not None: identifier = user.pk or id(user) if identifier in visited: break visited.add(identifier) username = _username_for(user) admin_username = getattr(user_model, "ADMIN_USERNAME", "") if admin_username and username == admin_username: try: delegate = getattr(user, "operate_as", None) except user_model.DoesNotExist: delegate = None else: delegate = _coerce_user(delegate, user_model) if delegate is not None and delegate is not user: user = delegate continue if _user_allows_user_data(user): return user try: delegate = getattr(user, "operate_as", None) except user_model.DoesNotExist: delegate = None user = _coerce_user(delegate, user_model) return None
def _resolve_fixture_user(instance, fallback=None): UserModel = get_user_model() owner = getattr(instance, "user", None) selected = _select_fixture_user(owner, UserModel) if selected is not None: return selected if hasattr(instance, "owner"): try: owner_value = instance.owner except Exception: owner_value = None else: selected = _select_fixture_user(owner_value, UserModel) if selected is not None: return selected selected = _select_fixture_user(fallback, UserModel) if selected is not None: return selected return fallback
def dump_user_fixture(instance, user=None) -> None: model = instance._meta.concrete_model UserModel = get_user_model() if issubclass(UserModel, Entity) and isinstance(instance, UserModel): return target_user = user or _resolve_fixture_user(instance) if target_user is None: return allow_user_data = _user_allows_user_data(target_user) if not allow_user_data: is_user_data = getattr(instance, "is_user_data", False) if not is_user_data and instance.pk: stored_flag = ( type(instance) .all_objects.filter(pk=instance.pk) .values_list("is_user_data", flat=True) .first() ) is_user_data = bool(stored_flag) if not is_user_data: return meta = model._meta path = _fixture_path(target_user, instance) call_command( "dumpdata", f"{meta.app_label}.{meta.model_name}", indent=2, pks=str(instance.pk), output=str(path), use_natural_foreign_keys=True, )
def delete_user_fixture(instance, user=None) -> None: target_user = user or resolve_fixture_user(instance) filename = ( f"{instance._meta.app_label}{instance.meta.model_name}{instance.pk}.json" )
def _remove_for_user(candidate) -> None:
if candidate is None:
return
base_path = Path(
getattr(candidate, "data_path", "") or Path(settings.BASE_DIR) / "data"
)
username = _username_for(candidate)
if not username:
return
user_dir = base_path / username
if user_dir.exists():
(user_dir / filename).unlink(missing_ok=True)
if target_user is not None:
_remove_for_user(target_user)
return
root = Path(settings.BASE_DIR) / "data"
if root.exists():
(root / filename).unlink(missing_ok=True)
for path in root.iterdir():
if path.is_dir():
(path / filename).unlink(missing_ok=True)
UserModel = get_user_model()
manager = getattr(UserModel, "all_objects", UserModel._default_manager)
for candidate in manager.all():
data_path = getattr(candidate, "data_path", "")
if not data_path:
continue
base_path = Path(data_path)
if not base_path.exists():
continue
username = _username_for(candidate)
if not username:
continue
user_dir = base_path / username
if user_dir.exists():
(user_dir / filename).unlink(missing_ok=True)
def _mark_fixture_user_data(path: Path) -> None: try: content = path.read_text(encoding="utf-8") except UnicodeDecodeError: try: content = path.read_bytes().decode("latin-1") except Exception: return except Exception: return try: data = json.loads(content) except Exception: return if not isinstance(data, list): return for obj in data: label = obj.get("model") if not label: continue try: model = apps.get_model(label) except LookupError: continue if not issubclass(model, Entity): continue pk = obj.get("pk") if pk is None: continue model.all_objects.filter(pk=pk).update(is_user_data=True)
def _fixture_targets_installed_apps(data) -> bool:
"""Return True when data only targets installed apps and models."""
if not isinstance(data, list):
return True
labels = {
obj.get("model")
for obj in data
if isinstance(obj, dict) and obj.get("model")
}
for label in labels:
if not isinstance(label, str):
continue
if "." not in label:
continue
app_label, model_name = label.split(".", 1)
if not app_label or not model_name:
continue
if not apps.is_installed(app_label):
return False
try:
apps.get_model(label)
except LookupError:
return False
return True
def _load_fixture(path: Path, , mark_user_data: bool = True) -> bool: """Load a fixture from path* and optionally flag loaded entities."""
text = None
try:
text = path.read_text(encoding="utf-8")
except UnicodeDecodeError:
try:
text = path.read_bytes().decode("latin-1")
except Exception:
return False
path.write_text(text, encoding="utf-8")
except Exception:
# Continue without cached text so ``call_command`` can surface the
# underlying error just as before.
pass
if text is not None:
try:
data = json.loads(text)
except Exception:
data = None
else:
if isinstance(data, list):
if not data:
path.unlink(missing_ok=True)
return False
if not _fixture_targets_installed_apps(data):
return False
try:
call_command("loaddata", str(path), ignorenonexistent=True)
except Exception:
return False
if mark_user_data:
_mark_fixture_user_data(path)
return True
def fixture_sort_key(path: Path) -> tuple[int, str]: parts = path.name.split("", 2) model_part = parts[1].lower() if len(parts) >= 2 else "" is_user = model_part == "user" return (0 if is_user else 1, path.name)
def is_user_fixture(path: Path) -> bool: parts = path.name.split("", 2) return len(parts) >= 2 and parts[1].lower() == "user"
def _get_request_ip(request) -> str:
"""Return the best-effort client IP for request."""
if request is None:
return ""
meta = getattr(request, "META", None)
if not getattr(meta, "get", None):
return ""
forwarded = meta.get("HTTP_X_FORWARDED_FOR")
if forwarded:
for value in str(forwarded).split(","):
candidate = value.strip()
if candidate:
return candidate
remote = meta.get("REMOTE_ADDR")
if remote:
return str(remote).strip()
return ""
_shared_fixtures_loaded = False
def load_shared_user_fixtures(, force: bool = False, user=None) -> None: global _shared_fixtures_loaded if _shared_fixtures_loaded and not force: return root = _data_root(user) paths = sorted(root.glob(".json"), key=_fixture_sort_key) for path in paths: if _is_user_fixture(path): continue _load_fixture(path) _shared_fixtures_loaded = True
def load_user_fixtures(user, , include_shared: bool = False) -> None: if include_shared: load_shared_user_fixtures(user=user) paths = sorted(_data_dir(user).glob(".json"), key=_fixture_sort_key) for path in paths: if _is_user_fixture(path): continue _load_fixture(path)
@receiver(user_logged_in) def _on_login(sender, request, user, **kwargs): load_user_fixtures(user, include_shared=not _shared_fixtures_loaded)
if not (
getattr(user, "is_staff", False) or getattr(user, "is_superuser", False)
):
return
username = _username_for(user) or "unknown"
ip_address = _get_request_ip(request) or "unknown"
from nodes.models import NetMessage
NetMessage.broadcast(subject=f"login {username}", body=f"@ {ip_address}")
@receiver(post_save, sender=settings.AUTH_USER_MODEL) def _on_user_created(sender, instance, created, **kwargs): if created: load_shared_user_fixtures(force=True, user=instance) load_user_fixtures(instance)
class UserDatumAdminMixin(admin.ModelAdmin): """Mixin adding a User Datum checkbox to change forms."""
def render_change_form(
self, request, context, add=False, change=False, form_url="", obj=None
):
supports_user_datum = issubclass(self.model, Entity) or getattr(
self.model, "supports_user_datum", False
)
supports_seed_datum = issubclass(self.model, Entity) or getattr(
self.model, "supports_seed_datum", supports_user_datum
)
context["show_user_datum"] = supports_user_datum
context["show_seed_datum"] = supports_seed_datum
context["show_save_as_copy"] = (
issubclass(self.model, Entity)
or getattr(self.model, "supports_save_as_copy", False)
or hasattr(self.model, "clone")
)
if obj is not None:
context["is_user_datum"] = getattr(obj, "is_user_data", False)
context["is_seed_datum"] = getattr(obj, "is_seed_data", False)
else:
context["is_user_datum"] = False
context["is_seed_datum"] = False
return super().render_change_form(request, context, add, change, form_url, obj)
class EntityModelAdmin(UserDatumAdminMixin, admin.ModelAdmin):
"""ModelAdmin base class for :class:Entity models."""
change_form_template = "admin/user_datum_change_form.html"
def save_model(self, request, obj, form, change):
copied = "_saveacopy" in request.POST
if copied:
obj = obj.clone() if hasattr(obj, "clone") else obj
obj.pk = None
form.instance = obj
try:
super().save_model(request, obj, form, False)
except Exception:
messages.error(
request,
_("Unable to save copy. Adjust unique fields and try again."),
)
raise
else:
super().save_model(request, obj, form, change)
if isinstance(obj, Entity):
type(obj).all_objects.filter(pk=obj.pk).update(
is_seed_data=obj.is_seed_data, is_user_data=obj.is_user_data
)
if copied:
return
if getattr(self, "_skip_entity_user_datum", False):
return
target_user = _resolve_fixture_user(obj, request.user)
allow_user_data = _user_allows_user_data(target_user)
if request.POST.get("_user_datum") == "on":
if allow_user_data:
if not obj.is_user_data:
type(obj).all_objects.filter(pk=obj.pk).update(is_user_data=True)
obj.is_user_data = True
dump_user_fixture(obj, target_user)
handler = getattr(self, "user_datum_saved", None)
if callable(handler):
handler(request, obj)
path = _fixture_path(target_user, obj)
self.message_user(request, f"User datum saved to {path}")
else:
if obj.is_user_data:
type(obj).all_objects.filter(pk=obj.pk).update(is_user_data=False)
obj.is_user_data = False
delete_user_fixture(obj, target_user)
messages.warning(
request,
_("User data is not available for this account."),
)
elif obj.is_user_data:
type(obj).all_objects.filter(pk=obj.pk).update(is_user_data=False)
obj.is_user_data = False
delete_user_fixture(obj, target_user)
handler = getattr(self, "user_datum_deleted", None)
if callable(handler):
handler(request, obj)
def patch_admin_user_datum() -> None: """Mixin all registered entity admin classes and future registrations."""
if getattr(admin.site, "_user_datum_patched", False):
return
def _patched(admin_class):
template = (
getattr(admin_class, "change_form_template", None)
or EntityModelAdmin.change_form_template
)
return type(
f"Patched{admin_class.__name__}",
(EntityModelAdmin, admin_class),
{"change_form_template": template},
)
for model, model_admin in list(admin.site._registry.items()):
if issubclass(model, Entity) and not isinstance(model_admin, EntityModelAdmin):
admin.site.unregister(model)
admin.site.register(model, _patched(model_admin.__class__))
original_register = admin.site.register
def register(model_or_iterable, admin_class=None, **options):
models = model_or_iterable
if not isinstance(models, (list, tuple, set)):
models = [models]
admin_class = admin_class or admin.ModelAdmin
patched_class = admin_class
for model in models:
if issubclass(model, Entity) and not issubclass(
patched_class, EntityModelAdmin
):
patched_class = _patched(patched_class)
return original_register(model_or_iterable, patched_class, **options)
admin.site.register = register
admin.site._user_datum_patched = True
def _iter_entity_admin_models():
"""Yield registered :class:Entity admin models without proxy duplicates."""
seen: set[type] = set()
for model, model_admin in admin.site._registry.items():
if not issubclass(model, Entity):
continue
concrete_model = model._meta.concrete_model
if concrete_model in seen:
continue
seen.add(concrete_model)
yield model, model_admin
def seed_data_view(request): sections = [] fixture_index = _seed_fixture_index() for model, model_admin in _iter_entity_admin_models(): objs = model.objects.filter(is_seed_data=True) if not objs.exists(): continue items = [] for obj in objs: url = reverse( f"admin:{obj._meta.app_label}{obj.meta.model_name}_change", args=[obj.pk], ) fixture = _seed_fixture_path(obj, index=fixture_index) items.append({"url": url, "label": str(obj), "fixture": fixture}) sections.append({"opts": model._meta, "items": items}) context = admin.site.each_context(request) context.update({"title": ("Seed Data"), "sections": sections}) return TemplateResponse(request, "admin/data_list.html", context)
def user_data_view(request): sections = [] for model, model_admin in _iter_entity_admin_models(): objs = model.objects.filter(is_user_data=True) if not objs.exists(): continue items = [] for obj in objs: url = reverse( f"admin:{obj._meta.app_label}{obj.meta.model_name}_change", args=[obj.pk], ) fixture = _fixture_path(request.user, obj) items.append({"url": url, "label": str(obj), "fixture": fixture}) sections.append({"opts": model._meta, "items": items}) context = admin.site.each_context(request) context.update( {"title": ("User Data"), "sections": sections, "import_export": True} ) return TemplateResponse(request, "admin/data_list.html", context)
def user_data_export(request): buffer = BytesIO() with ZipFile(buffer, "w") as zf: for path in _data_dir(request.user).glob("*.json"): zf.write(path, arcname=path.name) buffer.seek(0) response = HttpResponse(buffer.getvalue(), content_type="application/zip") response["Content-Disposition"] = ( f"attachment; filename=user_data{request.user.pk}.zip" ) return response
def _user_data_import(request): if request.method == "POST" and request.FILES.get("data_zip"): with ZipFile(request.FILES["data_zip"]) as zf: paths = [] data_dir = _data_dir(request.user) for name in zf.namelist(): if not name.endswith(".json"): continue content = zf.read(name) target = data_dir / name with target.open("wb") as f: f.write(content) paths.append(target) if paths: for path in paths: _load_fixture(path) return HttpResponseRedirect(reverse("admin:user_data"))
def patch_admin_user_data_views() -> None: original_get_urls = admin.site.get_urls
def get_urls():
urls = original_get_urls()
custom = [
path(
"seed-data/", admin.site.admin_view(_seed_data_view), name="seed_data"
),
path(
"user-data/", admin.site.admin_view(_user_data_view), name="user_data"
),
path(
"user-data/export/",
admin.site.admin_view(_user_data_export),
name="user_data_export",
),
path(
"user-data/import/",
admin.site.admin_view(_user_data_import),
name="user_data_import",
),
]
return custom + urls
admin.site.get_urls = get_urls