From 871dd020e3183aab2075e28baf2117c4bc6154eb Mon Sep 17 00:00:00 2001 From: Matthew Fricke Date: Wed, 27 May 2026 14:45:29 -0600 Subject: [PATCH] Initial Commit --- README.md | 92 +++++ __init__.py | 1 + admin.py | 180 ++++++++++ admin_audit.py | 89 +++++ allocation_workflow_audit.py | 66 ++++ apps.py | 18 + backfill.py | 635 +++++++++++++++++++++++++++++++++++ middleware.py | 26 ++ models.py | 303 +++++++++++++++++ project_review_audit.py | 116 +++++++ resolvers.py | 174 ++++++++++ signals.py | 538 +++++++++++++++++++++++++++++ tests.py | 574 +++++++++++++++++++++++++++++++ utils.py | 196 +++++++++++ 14 files changed, 3008 insertions(+) create mode 100644 README.md create mode 100644 __init__.py create mode 100644 admin.py create mode 100644 admin_audit.py create mode 100644 allocation_workflow_audit.py create mode 100644 apps.py create mode 100644 backfill.py create mode 100644 middleware.py create mode 100644 models.py create mode 100644 project_review_audit.py create mode 100644 resolvers.py create mode 100644 signals.py create mode 100644 tests.py create mode 100644 utils.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..f4fed47 --- /dev/null +++ b/README.md @@ -0,0 +1,92 @@ +# CMMC Audit Plugin + +The in-tree package is named `coldfront.plugins.cmmc_audit`, and the user-facing Django admin label is **CMMC Audit**. + +This plugin records append-only operational evidence for ColdFront workflows. It keeps raw forensic data such as `old_values`, `new_values`, target type/id, request path, IP address, source, and source id while adding CMMC-oriented admin columns for reviewers. + +## Evidence Coverage + +AC evidence: + +- Project membership changes: users added, removed, activated, and role changes. +- Project archival, deletion, status changes, and PI changes. +- PI and admin privilege changes. +- Allocation requests, approvals/status changes, renewals, and disable/revoke outcomes. +- Direct allocation renewal requests from `AllocationRenewView.post()`. + +AU evidence: + +- Append-only audit events with event time, actor, request path, IP address, and target. +- Generic Django admin add/change/delete actions. +- Project review submission/completion/status changes. +- Best-effort historical reconstruction from Django admin logs and selected ColdFront history tables. + +CM evidence: + +- Resource creation, update, and deletion. +- Resource changes preserve raw old/new values for configuration review. + +## Project Review Workflow Coverage + +Discovered ColdFront project review hooks: + +- `Project.force_review` and `Project.requires_review` are fields on `coldfront.core.project.models.Project`. `Project.needs_review` is computed; it returns true when the project is forced, when annual review is enabled and required, and when the project is older than the review interval. +- PI/manager submission is handled directly by `ProjectReviewView.post()` at `/project//review/`. It creates a `ProjectReview` with status `Pending` and clears `Project.force_review`. +- Staff completion is handled directly by `ProjectReviewCompleteView.get()` at `/project/project-review-complete//`. It changes the `ProjectReview` status to `Completed`. +- Staff request-for-updates is implemented as `ProjectReviewEmailView`; it sends email but does not change `ProjectReview` status. +- There are no custom ColdFront project-review signals in this checkout. + +Implemented semantic audit events: + +- `project_review_forced`: emitted when `Project.force_review` changes from false to true. +- `project_review_submitted`: emitted from the PI/manager review submission view after a `ProjectReview` is created. +- `project_review_completed`: emitted from the staff completion view after status changes to `Completed`. +- `project_review_status_changed`: emitted for non-workflow `ProjectReview.status` changes, such as admin/manual edits. + +Allocation workflow hooks: + +- Allocation request/approval/disable events use ColdFront allocation signals where available. +- Direct allocation renewal requests are captured by wrapping `AllocationRenewView.post()` because that view changes status to `Renewal Requested` without emitting an allocation change request signal. + +## Limitations + +- Historical reconstruction is best effort. It cannot perfectly reconstruct user intent, free-form comments, IP addresses, request paths, or actions that were not stored in existing Django admin logs or ColdFront history tables. +- Django admin log reconstruction does not invent old/new values. When direction is unknown, messages say so explicitly. +- Annual review due state is computed by `Project.needs_review`; no database write or signal is emitted when a project merely becomes due by age. +- Request-for-updates email does not alter review state, so it is not logged as a review status event. +- Slurm partition restrictions and external account-disable actions are only captured if represented by ColdFront resource/allocation changes in this first pass. +- The plugin wraps selected review view methods at runtime to avoid modifying ColdFront core. + +## Historical Reconstruction + +Historical reconstruction is implemented as an explicit management command. It is never run at startup or from `AppConfig.ready()`. + +Recommended sequence: + +```bash +python manage.py backfill_cmmc_audit --dry-run +python manage.py backfill_cmmc_audit --commit +``` + +The command records a `BackfillRun` named `initial_historical_backfill` when committed. A second `--commit` exits cleanly after a completed run. Use `--commit --force` to rerun; duplicate protection checks `source`, `source_id`, and `action`, so previously reconstructed events are skipped. + +Reconstructed entries are marked with `is_reconstructed=True`, keep the original event time when available, and include `source`/`source_id` for forensic traceability. Django admin log entries use `source='django_admin_log'`. ColdFront history entries use `source='coldfront_history'`. + +Audit rows are append-only and the backfill command does not rewrite existing reconstructed rows. During development testing, reset only the reconstructed rows and run marker before backfilling again if you need to validate changed reconstruction output: + +```python +from coldfront.plugins.cmmc_audit.models import AuditEvent, BackfillRun + +AuditEvent.objects.filter(is_reconstructed=True).delete() +BackfillRun.objects.filter(name="initial_historical_backfill").delete() +``` + +The command reconstructs: + +- Generic admin add/change/delete events from `django.contrib.admin.models.LogEntry`. +- Conservative semantic admin-log events for PI status changes with unknown direction, project deletion, project user role changes, and resource create/change/delete. +- Selected ColdFront history events when adjacent records expose clear old/new state: project archival/status/PI changes, allocation status changes, allocation end-date increases, project user role changes, and resource create/change/delete. + +## Development Validation + +Smoke tests should use Django's test database or an explicit rollback/cleanup block. Do not leave fixture users, projects, resources, allocations, or audit rows in a normal development database; those rows appear in the same evidence table as real operational events. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..b4a7ed1 --- /dev/null +++ b/__init__.py @@ -0,0 +1 @@ +default_app_config = "coldfront.plugins.cmmc_audit.apps.CmmcAuditConfig" diff --git a/admin.py b/admin.py new file mode 100644 index 0000000..5c34aec --- /dev/null +++ b/admin.py @@ -0,0 +1,180 @@ +from django.contrib import admin +from django.utils.html import format_html + +from .models import AuditEvent, BackfillRun + + +class ActorTypeFilter(admin.SimpleListFilter): + title = "Actor type" + parameter_name = "actor_type" + + def lookups(self, request, model_admin): + return ( + ("named", "Named actor"), + ("programmatic", "Programmatic"), + ) + + def queryset(self, request, queryset): + if self.value() == "named": + return queryset.filter(actor__isnull=False) + if self.value() == "programmatic": + return queryset.filter(actor__isnull=True) + return queryset + + +@admin.register(AuditEvent) +class AuditEventAdmin(admin.ModelAdmin): + list_display = ( + "event_time", + "actor_display", + "action_detail_display", + "target_summary", + "evidence_display", + "reconstructed_display", + "message_display", + ) + + list_filter = ( + "action", + ActorTypeFilter, + "evidence_category", + "source", + "is_reconstructed", + "actor", + "event_time", + "target_type", + ) + + search_fields = ( + 'actor__username', + 'actor__email', + 'target_repr', + 'target_id', + 'message', + "source_id", + ) + + readonly_fields = ( + "id", + "timestamp", + "event_time", + "actor", + "action", + "evidence_category", + "control_family", + "target_type", + "target_id", + "target_repr", + "old_values_display", + "new_values_display", + "old_values", + "new_values", + "message", + "source", + "source_id", + "is_reconstructed", + "request_path", + "ip_address", + ) + + date_hierarchy = "event_time" + + def message_display(self, obj): + return obj.message or "" + + message_display.short_description = "Message" + + def actor_display(self, obj): + return obj.actor_display + + actor_display.short_description = "Actor" + actor_display.admin_order_field = "actor" + + def action_display(self, obj): + return obj.action_display + + action_display.short_description = "Action" + action_display.admin_order_field = "action" + + def action_detail_display(self, obj): + return obj.action_detail_display + + action_detail_display.short_description = "Action" + action_detail_display.admin_order_field = "action" + + def transition_display(self, obj): + return obj.transition_display + + transition_display.short_description = "Transition" + + def target_summary(self, obj): + return obj.target_summary + + target_summary.short_description = "Target" + target_summary.admin_order_field = "target_repr" + + def evidence_summary(self, obj): + return obj.evidence_summary + + evidence_summary.short_description = "Evidence" + evidence_summary.admin_order_field = "evidence_category" + + def evidence_display(self, obj): + return format_html("{}
{}", obj.evidence_area_display, f"Source: {obj.source_display}") + + evidence_display.short_description = "Evidence" + evidence_display.admin_order_field = "evidence_category" + + def reconstructed_display(self, obj): + return obj.reconstructed_display + + reconstructed_display.short_description = "Reconstructed" + reconstructed_display.admin_order_field = "is_reconstructed" + + def old_values_display(self, obj): + return obj.old_values_display + + old_values_display.short_description = "Old values (resolved)" + + def new_values_display(self, obj): + return obj.new_values_display + + new_values_display.short_description = "New values (resolved)" + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False + + def log_addition(self, request, obj, message): + return None + + def log_change(self, request, obj, message): + return None + + def log_deletion(self, request, obj, object_repr): + return None + + def log_deletions(self, request, queryset): + return [] + + +@admin.register(BackfillRun) +class BackfillRunAdmin(admin.ModelAdmin): + list_display = ("name", "started", "completed", "created_events", "dry_run") + readonly_fields = ("name", "started", "completed", "created_events", "dry_run", "notes") + search_fields = ("name", "notes") + list_filter = ("dry_run", "started", "completed") + + def has_add_permission(self, request): + return False + + def has_change_permission(self, request, obj=None): + return False + + def has_delete_permission(self, request, obj=None): + return False diff --git a/admin_audit.py b/admin_audit.py new file mode 100644 index 0000000..2ccb8c5 --- /dev/null +++ b/admin_audit.py @@ -0,0 +1,89 @@ +import json + +from django.contrib import admin + +from .models import AuditEvent +from .utils import log_event + + +PATCH_MARKER = "_carc_audit_admin_log_patch_installed" + + +def _message_to_text(message): + if isinstance(message, str): + return message + return json.dumps(message, default=str) + + +def _admin_change_message(obj, message): + details = _message_to_text(message) + if details: + return f"Admin changed {obj}: {details}" + return f"Admin changed {obj}" + + +def install_admin_audit_patch(): + if getattr(admin.ModelAdmin, PATCH_MARKER, False): + return + + original_log_addition = admin.ModelAdmin.log_addition + original_log_change = admin.ModelAdmin.log_change + original_log_deletion = admin.ModelAdmin.log_deletion + original_log_deletions = admin.ModelAdmin.log_deletions + + def log_addition(self, request, obj, message): + result = original_log_addition(self, request, obj, message) + log_event( + AuditEvent.Action.ADMIN_ADDITION, + obj, + message=f"Admin added {obj}", + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.DJANGO_ADMIN, + ) + return result + + def log_change(self, request, obj, message): + result = original_log_change(self, request, obj, message) + log_event( + AuditEvent.Action.ADMIN_CHANGE, + obj, + message=_admin_change_message(obj, message), + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.DJANGO_ADMIN, + ) + return result + + def log_deletion(self, request, obj, object_repr): + result = original_log_deletion(self, request, obj, object_repr) + log_event( + AuditEvent.Action.ADMIN_DELETION, + obj, + message=f"Admin deleted {object_repr}", + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.DJANGO_ADMIN, + target_repr=object_repr, + ) + return result + + def log_deletions(self, request, queryset): + objects = list(queryset) + result = original_log_deletions(self, request, queryset) + for obj in objects: + log_event( + AuditEvent.Action.ADMIN_DELETION, + obj, + message=f"Admin deleted {obj}", + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.DJANGO_ADMIN, + ) + return result + + admin.ModelAdmin.log_addition = log_addition + admin.ModelAdmin.log_change = log_change + admin.ModelAdmin.log_deletion = log_deletion + admin.ModelAdmin.log_deletions = log_deletions + setattr(admin.ModelAdmin, PATCH_MARKER, True) diff --git a/allocation_workflow_audit.py b/allocation_workflow_audit.py new file mode 100644 index 0000000..f7b4025 --- /dev/null +++ b/allocation_workflow_audit.py @@ -0,0 +1,66 @@ +from django.shortcuts import get_object_or_404 + +from coldfront.core.allocation.models import Allocation + +from .models import AuditEvent +from .resolvers import allocation_label +from .utils import log_event + + +PATCH_MARKER = "_carc_audit_allocation_workflow_patch_installed" + + +def _date_value(value): + return value.isoformat() if value else None + + +def _allocation_summary(allocation): + return allocation_label(allocation) + + +def install_allocation_workflow_audit_patch(): + from coldfront.core.allocation import views as allocation_views + + if getattr(allocation_views, PATCH_MARKER, False): + return + + original_renew_post = allocation_views.AllocationRenewView.post + + def renew_post(self, request, *args, **kwargs): + allocation = get_object_or_404( + Allocation.objects.select_related("project", "status"), + pk=self.kwargs.get("pk"), + ) + old_status = allocation.status.name + old_end_date = allocation.end_date + + response = original_renew_post(self, request, *args, **kwargs) + + allocation.refresh_from_db() + allocation = Allocation.objects.select_related("project", "status").get(pk=allocation.pk) + new_status = allocation.status.name + if old_status != new_status and new_status == "Renewal Requested": + log_event( + AuditEvent.Action.RENEWAL_REQUESTED, + allocation, + old_values={ + "status": old_status, + "end_date": _date_value(old_end_date), + "project_id": allocation.project_id, + }, + new_values={ + "status": new_status, + "end_date": _date_value(allocation.end_date), + "project_id": allocation.project_id, + }, + message=f"Renewal requested for {_allocation_summary(allocation)}", + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.COLDFRONT_WORKFLOW, + target_repr=_allocation_summary(allocation), + ) + + return response + + allocation_views.AllocationRenewView.post = renew_post + setattr(allocation_views, PATCH_MARKER, True) diff --git a/apps.py b/apps.py new file mode 100644 index 0000000..4711cc1 --- /dev/null +++ b/apps.py @@ -0,0 +1,18 @@ +from django.apps import AppConfig + + +class CmmcAuditConfig(AppConfig): + default_auto_field = 'django.db.models.BigAutoField' + name = 'coldfront.plugins.cmmc_audit' + label = 'cmmc_audit' + verbose_name = 'CMMC Audit' + + def ready(self): + from .allocation_workflow_audit import install_allocation_workflow_audit_patch + from .admin_audit import install_admin_audit_patch + from .project_review_audit import install_project_review_audit_patch + from . import signals # noqa: F401 + + install_admin_audit_patch() + install_allocation_workflow_audit_patch() + install_project_review_audit_patch() diff --git a/backfill.py b/backfill.py new file mode 100644 index 0000000..d9e3a12 --- /dev/null +++ b/backfill.py @@ -0,0 +1,635 @@ +import json +from collections import Counter + +from django.apps import apps +from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry +from django.utils import timezone + +from .models import AuditEvent +from .resolvers import ( + allocation_label, + allocation_label_from_parts, + project_label, + project_label_from_id, + project_user_label, + user_label, + user_profile_label_from_id, +) +from .utils import ACTION_EVIDENCE_MAP + + +RUN_NAME = "initial_historical_backfill" +ADMIN_SOURCE = AuditEvent.Source.DJANGO_ADMIN_LOG +HISTORY_SOURCE = AuditEvent.Source.COLDFRONT_HISTORY + + +class BackfillReport: + def __init__(self): + self.examined = Counter() + self.created = Counter() + self.would_create = Counter() + self.duplicates = 0 + self.ambiguous = 0 + self.by_action = Counter() + self.by_evidence = Counter() + + def record_examined(self, source): + self.examined[source] += 1 + + def record_duplicate(self): + self.duplicates += 1 + + def record_ambiguous(self): + self.ambiguous += 1 + + def record_event(self, action, evidence_category, dry_run): + if dry_run: + self.would_create[action] += 1 + else: + self.created[action] += 1 + self.by_action[action] += 1 + self.by_evidence[evidence_category or "unclassified"] += 1 + + @property + def created_events(self): + return sum(self.created.values()) + + @property + def would_create_events(self): + return sum(self.would_create.values()) + + +def _json_message(change_message): + if not change_message: + return "" + if isinstance(change_message, str): + return change_message + return json.dumps(change_message, default=str) + + +def _changed_fields(change_message): + if not change_message: + return [] + try: + entries = json.loads(change_message) if isinstance(change_message, str) else change_message + except (TypeError, ValueError): + return [] + fields = [] + for entry in entries: + changed = entry.get("changed", {}) if isinstance(entry, dict) else {} + fields.extend(changed.get("fields", [])) + return fields + + +def _has_changed_field(change_message, field_name): + normalized = {field.lower().replace("_", " ") for field in _changed_fields(change_message)} + return field_name.lower().replace("_", " ") in normalized + + +def _target_from_logentry(log_entry): + if not log_entry.content_type_id: + return "", "", log_entry.object_id or "", log_entry.object_repr + app_label = log_entry.content_type.app_label + model_name = log_entry.content_type.model + return app_label, model_name, log_entry.object_id or "", log_entry.object_repr + + +def _defaults_for_action(action): + evidence_category, control_family = ACTION_EVIDENCE_MAP.get(action, ("", "")) + return evidence_category, control_family + + +def _create_event( + *, + action, + source, + source_id, + event_time, + actor, + target_type, + target_id, + target_repr, + message, + old_values=None, + new_values=None, + dry_run, + report, +): + if AuditEvent.objects.filter(source=source, source_id=str(source_id), action=action).exists(): + report.record_duplicate() + return None + + evidence_category, control_family = _defaults_for_action(action) + report.record_event(action, evidence_category, dry_run) + if dry_run: + return None + + return AuditEvent.objects.create( + actor=actor, + action=action, + evidence_category=evidence_category, + control_family=control_family, + target_type=target_type, + target_id=str(target_id or "")[:128], + target_repr=str(target_repr or "")[:255], + old_values=old_values or {}, + new_values=new_values or {}, + message=message, + source=source, + source_id=str(source_id)[:128], + is_reconstructed=True, + event_time=event_time, + ) + + +def _admin_action(log_entry): + if log_entry.action_flag == ADDITION: + return AuditEvent.Action.ADMIN_ADDITION + if log_entry.action_flag == CHANGE: + return AuditEvent.Action.ADMIN_CHANGE + if log_entry.action_flag == DELETION: + return AuditEvent.Action.ADMIN_DELETION + return None + + +def _admin_message(log_entry, action, object_repr=None): + change_message = _json_message(log_entry.change_message) + target_repr = object_repr or log_entry.object_repr + if action == AuditEvent.Action.ADMIN_ADDITION: + verb = "added" + elif action == AuditEvent.Action.ADMIN_DELETION: + verb = "deleted" + else: + verb = "changed" + suffix = f" Change message: {change_message}" if change_message else "" + return f"Reconstructed from Django admin log: admin {verb} {target_repr}.{suffix}" + + +def _admin_target_repr(log_entry, target_type, object_id, object_repr): + if target_type == "user.userprofile": + return user_profile_label_from_id(object_id) + if target_type == "project.project" and log_entry.action_flag == DELETION: + return f"Deleted project: {object_repr} (id {object_id})" + return object_repr + + +def backfill_admin_log(*, dry_run, report): + for log_entry in LogEntry.objects.select_related("content_type", "user").order_by("pk"): + report.record_examined("django_admin_log") + action = _admin_action(log_entry) + if action is None: + report.record_ambiguous() + continue + + app_label, model_name, object_id, object_repr = _target_from_logentry(log_entry) + target_type = f"{app_label}.{model_name}" if app_label and model_name else "" + target_repr = _admin_target_repr(log_entry, target_type, object_id, object_repr) + change_message = _json_message(log_entry.change_message) + _create_event( + action=action, + source=ADMIN_SOURCE, + source_id=log_entry.pk, + event_time=log_entry.action_time, + actor=log_entry.user, + target_type=target_type, + target_id=object_id, + target_repr=target_repr, + message=_admin_message(log_entry, action, target_repr), + new_values={"change_message": change_message, "content_type": target_type}, + dry_run=dry_run, + report=report, + ) + + for semantic in _semantic_events_from_logentry(log_entry, target_type, object_id, target_repr, change_message): + _create_event( + source=ADMIN_SOURCE, + source_id=log_entry.pk, + event_time=log_entry.action_time, + actor=log_entry.user, + dry_run=dry_run, + report=report, + **semantic, + ) + + +def _semantic_events_from_logentry(log_entry, target_type, object_id, object_repr, change_message): + events = [] + prefix = "Reconstructed from Django admin log:" + fields = ", ".join(_changed_fields(change_message)) or "unknown fields" + + if target_type == "user.userprofile" and _has_changed_field(change_message, "Is pi"): + events.append( + { + "action": AuditEvent.Action.USER_PI_UPGRADED, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} PI status changed for {object_repr}; direction unknown", + "new_values": {"change_message": change_message, "direction": "unknown"}, + } + ) + elif target_type == "project.projectuser" and _has_changed_field(change_message, "Role"): + events.append( + { + "action": AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} project user role changed for {object_repr}; exact old/new roles unknown.", + "new_values": {"change_message": change_message, "fields": fields}, + } + ) + elif target_type == "resource.resource": + resource_action = { + ADDITION: AuditEvent.Action.RESOURCE_CREATED, + CHANGE: AuditEvent.Action.RESOURCE_CHANGED, + DELETION: AuditEvent.Action.RESOURCE_DELETED, + }.get(log_entry.action_flag) + if resource_action: + events.append( + { + "action": resource_action, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} resource event for {object_repr}.", + "new_values": {"change_message": change_message, "fields": fields}, + } + ) + elif target_type == "project.project": + project_action = { + ADDITION: AuditEvent.Action.PROJECT_CREATED, + DELETION: AuditEvent.Action.PROJECT_DELETED, + }.get(log_entry.action_flag) + if project_action: + events.append( + { + "action": project_action, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} project event for {object_repr}.", + "new_values": {"change_message": change_message, "fields": fields}, + } + ) + elif log_entry.action_flag == CHANGE and _has_changed_field(change_message, "Pi"): + events.append( + { + "action": AuditEvent.Action.PROJECT_PI_CHANGED, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} project PI changed for {object_repr}; exact old/new PI unknown.", + "new_values": {"change_message": change_message, "fields": fields}, + } + ) + elif log_entry.action_flag == CHANGE and _has_changed_field(change_message, "Status"): + events.append( + { + "action": AuditEvent.Action.PROJECT_STATUS_CHANGED, + "target_type": target_type, + "target_id": object_id, + "target_repr": object_repr, + "message": f"{prefix} project status changed for {object_repr}; exact old/new status unknown.", + "new_values": {"change_message": change_message, "fields": fields}, + } + ) + return events + + +def _model_or_none(label): + try: + return apps.get_model(label) + except LookupError: + return None + + +def _choice_name(model_label, pk): + if pk in (None, ""): + return None + model = _model_or_none(model_label) + if model is None: + return str(pk) + try: + return model.objects.get(pk=pk).name + except Exception: + return str(pk) + + +def _history_source_id(model_label, history): + return f"{model_label}:{history.history_id}" + + +def _history_actor(history): + return getattr(history, "history_user", None) + + +def _resource_repr(history): + if getattr(history, "name", ""): + return f"Resource: {history.name}" + return f"Resource id {history.id}" + + +def _project_reference(project_id): + if project_id in (None, ""): + return "Project unknown" + Project = _model_or_none("project.Project") + if Project is None: + return f"Project id {project_id} (model unavailable)" + return project_label_from_id(project_id) + + +def _project_history_repr(history, *, deleted=False): + title = getattr(history, "title", "") or f"Project id {history.id}" + if deleted: + return f"Deleted project: {title} (id {history.id})" + current_label = project_label_from_id(history.id) + if "(not found)" not in current_label: + return current_label + return f"Project: {title} (id {history.id})" + + +def _user_reference(user_id): + if user_id in (None, ""): + return "User unknown" + User = _model_or_none("auth.User") + if User is None: + return f"User id {user_id} (model unavailable)" + try: + return f"{user_label(User.objects.get(pk=user_id))} (id {user_id})" + except Exception: + return f"User id {user_id} (not found)" + + +def _allocation_repr(history): + Allocation = _model_or_none("allocation.Allocation") + if Allocation is not None: + try: + return allocation_label(Allocation.objects.get(pk=history.id)) + except Exception: + pass + resource_names = getattr(history, "resource_names", []) + return allocation_label_from_parts(resource_names, _project_reference(getattr(history, "project_id", None))) + + +def _project_user_repr(history): + ProjectUser = _model_or_none("project.ProjectUser") + if ProjectUser is not None: + try: + return project_user_label(ProjectUser.objects.get(pk=history.id)) + except Exception: + pass + return ( + f"Project user {history.id}: " + f"{_user_reference(getattr(history, 'user_id', None))} on " + f"{_project_reference(getattr(history, 'project_id', None))}" + ) + + +def _iter_adjacent(history_model): + rows = list(history_model.objects.order_by("id", "history_date", "history_id")) + previous_by_id = {} + for row in rows: + previous = previous_by_id.get(row.id) + yield previous, row + previous_by_id[row.id] = row + + +def backfill_history(*, dry_run, report): + _backfill_project_history(dry_run=dry_run, report=report) + _backfill_resource_history(dry_run=dry_run, report=report) + _backfill_project_user_history(dry_run=dry_run, report=report) + _backfill_allocation_history(dry_run=dry_run, report=report) + + +def _backfill_project_history(*, dry_run, report): + model_label = "project.HistoricalProject" + model = _model_or_none(model_label) + if model is None: + return + for previous, history in _iter_adjacent(model): + report.record_examined(model_label) + if history.history_type == "+": + _create_event( + action=AuditEvent.Action.PROJECT_CREATED, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="project.project", + target_id=history.id, + target_repr=_project_history_repr(history), + message=f"Reconstructed from ColdFront history: project created: {_project_history_repr(history)}.", + new_values=_project_history_values(history), + dry_run=dry_run, + report=report, + ) + continue + + if history.history_type == "-": + _create_event( + action=AuditEvent.Action.PROJECT_DELETED, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="project.project", + target_id=history.id, + target_repr=_project_history_repr(history, deleted=True), + message=f"Reconstructed from ColdFront history: project deleted: {_project_history_repr(history, deleted=True)}.", + old_values=_project_history_values(history), + dry_run=dry_run, + report=report, + ) + continue + + if history.history_type != "~" or previous is None: + report.record_ambiguous() + continue + + event_created = False + if previous.status_id != history.status_id: + old_status = _choice_name("project.ProjectStatusChoice", previous.status_id) + new_status = _choice_name("project.ProjectStatusChoice", history.status_id) + action = ( + AuditEvent.Action.PROJECT_ARCHIVED + if new_status == "Archived" + else AuditEvent.Action.PROJECT_STATUS_CHANGED + ) + _create_event( + action=action, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="project.project", + target_id=history.id, + target_repr=_project_history_repr(history), + message=f"Reconstructed from ColdFront history: project status changed from {old_status} to {new_status}.", + old_values={"status": old_status, "pi_id": previous.pi_id}, + new_values={"status": new_status, "pi_id": history.pi_id}, + dry_run=dry_run, + report=report, + ) + event_created = True + + if previous.pi_id != history.pi_id: + _create_event( + action=AuditEvent.Action.PROJECT_PI_CHANGED, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="project.project", + target_id=history.id, + target_repr=_project_history_repr(history), + message=( + "Reconstructed from ColdFront history: project PI changed " + f"from {_user_reference(previous.pi_id)} to {_user_reference(history.pi_id)}." + ), + old_values={"pi_id": previous.pi_id}, + new_values={"pi_id": history.pi_id}, + dry_run=dry_run, + report=report, + ) + event_created = True + + if not event_created: + report.record_ambiguous() + + +def _backfill_resource_history(*, dry_run, report): + model_label = "resource.HistoricalResource" + model = _model_or_none(model_label) + if model is None: + return + tracked = ("name", "description", "is_available", "is_public", "is_allocatable", "requires_payment") + for previous, history in _iter_adjacent(model): + report.record_examined(model_label) + if history.history_type == "+": + action = AuditEvent.Action.RESOURCE_CREATED + elif history.history_type == "-": + action = AuditEvent.Action.RESOURCE_DELETED + elif history.history_type == "~" and previous is not None and any( + getattr(previous, field) != getattr(history, field) for field in tracked + ): + action = AuditEvent.Action.RESOURCE_CHANGED + else: + report.record_ambiguous() + continue + _create_event( + action=action, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="resource.resource", + target_id=history.id, + target_repr=_resource_repr(history), + message=f"Reconstructed from ColdFront history: resource {_resource_repr(history)} {action.label.lower()}.", + old_values=_history_values(previous, tracked) if previous else {}, + new_values=_history_values(history, tracked) if history.history_type != "-" else {}, + dry_run=dry_run, + report=report, + ) + + +def _backfill_project_user_history(*, dry_run, report): + model_label = "project.HistoricalProjectUser" + model = _model_or_none(model_label) + if model is None: + return + for previous, history in _iter_adjacent(model): + report.record_examined(model_label) + if history.history_type != "~" or previous is None or previous.role_id == history.role_id: + report.record_ambiguous() + continue + old_role = _choice_name("project.ProjectUserRoleChoice", previous.role_id) + new_role = _choice_name("project.ProjectUserRoleChoice", history.role_id) + _create_event( + action=AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="project.projectuser", + target_id=history.id, + target_repr=_project_user_repr(history), + message=f"Reconstructed from ColdFront history: project user role changed from {old_role} to {new_role}.", + old_values={"role": old_role, "project_id": previous.project_id, "user_id": previous.user_id}, + new_values={"role": new_role, "project_id": history.project_id, "user_id": history.user_id}, + dry_run=dry_run, + report=report, + ) + + +def _backfill_allocation_history(*, dry_run, report): + model_label = "allocation.HistoricalAllocation" + model = _model_or_none(model_label) + if model is None: + return + for previous, history in _iter_adjacent(model): + report.record_examined(model_label) + if history.history_type != "~" or previous is None: + report.record_ambiguous() + continue + + if previous.status_id != history.status_id: + new_status = _choice_name("allocation.AllocationStatusChoice", history.status_id) + old_status = _choice_name("allocation.AllocationStatusChoice", previous.status_id) + action = AuditEvent.Action.ALLOCATION_DISABLED if new_status in {"Denied", "Revoked", "Disabled"} else AuditEvent.Action.ALLOCATION_STATUS_CHANGED + _create_event( + action=action, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="allocation.allocation", + target_id=history.id, + target_repr=_allocation_repr(history), + message=f"Reconstructed from ColdFront history: allocation status changed from {old_status} to {new_status}.", + old_values={"status": old_status, "project_id": previous.project_id}, + new_values={"status": new_status, "project_id": history.project_id}, + dry_run=dry_run, + report=report, + ) + + if previous.end_date and history.end_date and history.end_date > previous.end_date: + _create_event( + action=AuditEvent.Action.ALLOCATION_RENEWED, + source=HISTORY_SOURCE, + source_id=_history_source_id(model_label, history), + event_time=history.history_date, + actor=_history_actor(history), + target_type="allocation.allocation", + target_id=history.id, + target_repr=_allocation_repr(history), + message=f"Reconstructed from ColdFront history: allocation end date increased from {previous.end_date} to {history.end_date}.", + old_values={"end_date": previous.end_date.isoformat(), "project_id": previous.project_id}, + new_values={"end_date": history.end_date.isoformat(), "project_id": history.project_id}, + dry_run=dry_run, + report=report, + ) + + +def _history_values(history, fields): + if history is None: + return {} + return {field: getattr(history, field) for field in fields} + + +def _project_history_values(history): + return { + "title": history.title, + "status": _choice_name("project.ProjectStatusChoice", history.status_id), + "status_id": history.status_id, + "pi_id": history.pi_id, + "project_code": history.project_code, + } + + +def run_backfill(*, dry_run): + report = BackfillReport() + backfill_admin_log(dry_run=dry_run, report=report) + backfill_history(dry_run=dry_run, report=report) + return report diff --git a/middleware.py b/middleware.py new file mode 100644 index 0000000..8c8693f --- /dev/null +++ b/middleware.py @@ -0,0 +1,26 @@ +import threading + + +_state = threading.local() + + +def get_current_actor(): + return getattr(_state, "actor", None) + + +def get_current_request(): + return getattr(_state, "request", None) + + +class AuditActorMiddleware: + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + _state.request = request + _state.actor = getattr(request, "user", None) + try: + return self.get_response(request) + finally: + _state.request = None + _state.actor = None diff --git a/models.py b/models.py new file mode 100644 index 0000000..acfaab8 --- /dev/null +++ b/models.py @@ -0,0 +1,303 @@ +from django.conf import settings +from django.db import models +from django.utils import timezone + +from .resolvers import actor_label, resolve_target_label, resolve_value_labels + + +ALLOCATION_TRANSITION_ACTIONS = { + "allocation_status_changed", + "allocation_disabled", + "renewal_requested", + "renewal_approved", + "allocation_renewed", +} + + +EVIDENCE_AREA_LABELS = { + "AC": "Access Control", + "AU": "Audit Logging", + "CM": "Configuration Management", + "IA": "Identification and Authentication", +} + +SOURCE_LABELS = { + "runtime": "Runtime", + "django_admin": "Django admin", + "coldfront_signal": "ColdFront signal", + "coldfront_workflow": "ColdFront workflow", + "reconstructed_django_admin_log": "Reconstructed Django admin log", + "django_admin_log": "Django admin log", + "coldfront_history": "ColdFront history table", +} + + +class AuditEvent(models.Model): + class EvidenceCategory(models.TextChoices): + AC_ACCESS_CONTROL = "AC-Access-Control", "AC - Access Control" + AU_AUDIT_ACCOUNTABILITY = "AU-Audit-Accountability", "AU - Audit and Accountability" + AC_AU = "AC-Access-Control; AU-Audit-Accountability", "AC/AU - Access and Audit" + CM_CONFIGURATION_MANAGEMENT = "CM-Configuration-Management", "CM - Configuration Management" + IA_IDENTIFICATION_AUTHENTICATION = ( + "IA-Identification-Authentication", + "IA - Identification and Authentication", + ) + + class Source(models.TextChoices): + RUNTIME = "runtime", "Runtime" + DJANGO_ADMIN = "django_admin", "Django admin" + COLDFRONT_SIGNAL = "coldfront_signal", "ColdFront signal" + COLDFRONT_WORKFLOW = "coldfront_workflow", "ColdFront workflow" + RECONSTRUCTED_DJANGO_ADMIN_LOG = ( + "reconstructed_django_admin_log", + "Reconstructed Django admin log", + ) + DJANGO_ADMIN_LOG = "django_admin_log", "Django admin log" + COLDFRONT_HISTORY = "coldfront_history", "ColdFront history" + + class Action(models.TextChoices): + ADMIN_ADDITION = "admin_addition", "Admin addition" + ADMIN_CHANGE = "admin_change", "Admin change" + ADMIN_DELETION = "admin_deletion", "Admin deletion" + USER_PI_UPGRADED = "user_pi_upgraded", "User upgraded to PI" + PI_STATUS_REVOKED = "pi_status_revoked", "PI status revoked" + USER_ADMIN_PRIVILEGES_CHANGED = "user_admin_privileges_changed", "User admin privileges changed" + ALLOCATION_REQUESTED = "allocation_requested", "Allocation created/requested" + ALLOCATION_CHANGE_REQUESTED = "allocation_change_requested", "Allocation change requested" + ALLOCATION_STATUS_CHANGED = "allocation_status_changed", "Allocation status changed" + ALLOCATION_DISABLED = "allocation_disabled", "Allocation disabled" + RENEWAL_REQUESTED = "renewal_requested", "Renewal requested" + RENEWAL_APPROVED = "renewal_approved", "Renewal approved" + ALLOCATION_RENEWED = "allocation_renewed", "Allocation renewed" + PROJECT_USER_ADDED = "project_user_added", "Project user added" + PROJECT_USER_REMOVED = "project_user_removed", "Project user removed" + PROJECT_USER_ROLE_CHANGED = "project_user_role_changed", "Project user role changed" + PROJECT_CREATED = "project_created", "Project created" + PROJECT_ARCHIVED = "project_archived", "Project archived" + PROJECT_DELETED = "project_deleted", "Project deleted" + PROJECT_STATUS_CHANGED = "project_status_changed", "Project status changed" + PROJECT_PI_CHANGED = "project_pi_changed", "Project PI changed" + PROJECT_REVIEW_FORCED = "project_review_forced", "Project review forced" + PROJECT_REVIEW_SUBMITTED = "project_review_submitted", "Project review submitted" + PROJECT_REVIEW_COMPLETED = "project_review_completed", "Project review completed" + PROJECT_REVIEW_STATUS_CHANGED = "project_review_status_changed", "Project review status changed" + RESOURCE_CREATED = "resource_created", "Resource created" + RESOURCE_CHANGED = "resource_changed", "Resource changed" + RESOURCE_DELETED = "resource_deleted", "Resource deleted" + + timestamp = models.DateTimeField(auto_now_add=True) + event_time = models.DateTimeField(default=timezone.now) + + actor = models.ForeignKey( + settings.AUTH_USER_MODEL, + null=True, + blank=True, + on_delete=models.SET_NULL, + related_name="carc_audit_events", + ) + + action = models.CharField(max_length=64, choices=Action.choices) + evidence_category = models.CharField( + max_length=128, + choices=EvidenceCategory.choices, + blank=True, + ) + control_family = models.CharField(max_length=16, blank=True) + target_type = models.CharField(max_length=128, blank=True) + target_id = models.CharField(max_length=128, blank=True) + target_repr = models.CharField(max_length=255, blank=True) + + old_values = models.JSONField(default=dict, blank=True) + new_values = models.JSONField(default=dict, blank=True) + message = models.TextField(blank=True) + source = models.CharField(max_length=64, choices=Source.choices, default=Source.RUNTIME) + source_id = models.CharField(max_length=128, blank=True) + is_reconstructed = models.BooleanField(default=False) + request_path = models.CharField(max_length=255, blank=True) + ip_address = models.GenericIPAddressField(null=True, blank=True) + + class Meta: + ordering = ["-timestamp"] + indexes = [ + models.Index(fields=["event_time"]), + models.Index(fields=["action"]), + models.Index(fields=["evidence_category"]), + models.Index(fields=["source"]), + models.Index(fields=["actor"]), + models.Index(fields=["target_type", "target_id"]), + ] + + def __str__(self): + return f"{self.event_time} {self.action_detail_display} {self.target_summary}" + + @property + def action_display(self): + return self.get_action_display() + + @property + def action_detail_display(self): + if self.action not in ALLOCATION_TRANSITION_ACTIONS: + return self.action_display + + transition = self.transition_display + if self.action == self.Action.ALLOCATION_STATUS_CHANGED: + if self._new_status() == "Active" and self._old_status() == "New": + label = "Allocation approved" + else: + label = "Allocation status changed" + elif self.action == self.Action.ALLOCATION_DISABLED: + if self._new_status() == "Revoked": + label = "Allocation revoked" + else: + label = "Allocation disabled" + elif self.action == self.Action.RENEWAL_APPROVED: + label = "Allocation renewal approved" + elif self.action == self.Action.ALLOCATION_RENEWED: + label = "Allocation renewed" + else: + label = self.action_display + + if transition: + return f"{label} ({transition})" + return label + + @property + def transition_display(self): + if self.action not in ALLOCATION_TRANSITION_ACTIONS: + return "" + + old_status = self._old_status() + new_status = self._new_status() + if old_status and new_status: + return f"{old_status} \u2192 {new_status}" + + if self.action == self.Action.ALLOCATION_RENEWED: + days = self._renewal_days() + if days: + return f"+{days} days" + + return "" + + def _old_status(self): + return self._value_from(self.old_values, "status") + + def _new_status(self): + return self._value_from(self.new_values, "status") + + def _renewal_days(self): + extension = self._value_from(self.new_values, "end_date_extension") + if isinstance(extension, int) and extension > 0: + return extension + if isinstance(extension, str) and extension.isdigit() and int(extension) > 0: + return int(extension) + + old_end_date = self._value_from(self.old_values, "end_date") + new_end_date = self._value_from(self.new_values, "end_date") + if not old_end_date or not new_end_date: + return None + try: + old_date = timezone.datetime.fromisoformat(old_end_date).date() + new_date = timezone.datetime.fromisoformat(new_end_date).date() + except (TypeError, ValueError): + return None + days = (new_date - old_date).days + return days if days > 0 else None + + def _value_from(self, values, key): + if not isinstance(values, dict): + return None + value = values.get(key) + if value not in (None, ""): + return value + return values.get(f"allocation_{key}") + + @property + def actor_display(self): + return actor_label(self.actor) + + @property + def target_summary(self): + resolved = resolve_target_label(self.target_type, self.target_id) + if resolved and not ( + self.target_type == "project.project" + and self.target_repr + and "(not found)" in resolved + ): + return resolved + if self.target_repr: + return self.target_repr + if self.target_type and self.target_id: + return f"{self.target_type} #{self.target_id}" + return self.target_type or "Unknown target" + + @property + def evidence_summary(self): + if not self.evidence_category: + return "" + if self.control_family: + return f"{self.control_family}: {self.evidence_category}" + return self.evidence_category + + @property + def evidence_area_display(self): + if self.control_family: + areas = [ + EVIDENCE_AREA_LABELS.get(family.strip(), family.strip()) + for family in self.control_family.split(",") + if family.strip() + ] + if areas: + return " + ".join(areas) + + if self.evidence_category: + category = str(self.evidence_category) + if category.startswith("AC-"): + return EVIDENCE_AREA_LABELS["AC"] + if category.startswith("AU-"): + return EVIDENCE_AREA_LABELS["AU"] + if category.startswith("CM-"): + return EVIDENCE_AREA_LABELS["CM"] + if category.startswith("IA-"): + return EVIDENCE_AREA_LABELS["IA"] + return category + + return "Unclassified evidence" + + @property + def source_display(self): + return SOURCE_LABELS.get(self.source, f"Unknown source ({self.source or 'blank'})") + + @property + def evidence_display(self): + return f"{self.evidence_area_display}\nSource: {self.source_display}" + + @property + def reconstructed_display(self): + if not self.is_reconstructed: + return "No" + if self.source_id: + return f"Yes ({self.source_id})" + return "Yes" + + @property + def old_values_display(self): + return resolve_value_labels(self.old_values) + + @property + def new_values_display(self): + return resolve_value_labels(self.new_values) + + +class BackfillRun(models.Model): + name = models.CharField(max_length=128, unique=True) + started = models.DateTimeField(auto_now_add=True) + completed = models.DateTimeField(null=True, blank=True) + created_events = models.PositiveIntegerField(default=0) + dry_run = models.BooleanField(default=False) + notes = models.TextField(blank=True) + + class Meta: + ordering = ["-started"] + + def __str__(self): + return self.name diff --git a/project_review_audit.py b/project_review_audit.py new file mode 100644 index 0000000..00f0951 --- /dev/null +++ b/project_review_audit.py @@ -0,0 +1,116 @@ +import threading + +from django.shortcuts import get_object_or_404 + +from coldfront.core.project.models import Project, ProjectReview + +from .models import AuditEvent +from .resolvers import project_label, project_review_label +from .utils import log_event + + +PATCH_MARKER = "_carc_audit_project_review_patch_installed" +_state = threading.local() + + +def _suppressed_project_review_ids(): + ids = getattr(_state, "suppressed_project_review_ids", None) + if ids is None: + ids = set() + _state.suppressed_project_review_ids = ids + return ids + + +def is_project_review_status_suppressed(project_review_pk): + return project_review_pk in _suppressed_project_review_ids() + + +def _status_name(project_review): + return getattr(project_review.status, "name", None) + + +def _review_values(project_review): + return { + "project_id": project_review.project_id, + "status": _status_name(project_review), + "reason_for_not_updating_project": project_review.reason_for_not_updating_project or "", + } + + +def install_project_review_audit_patch(): + from coldfront.core.project import views as project_views + + if getattr(project_views, PATCH_MARKER, False): + return + + original_review_post = project_views.ProjectReviewView.post + original_complete_get = project_views.ProjectReviewCompleteView.get + + def review_post(self, request, *args, **kwargs): + project = get_object_or_404(Project, pk=self.kwargs.get("pk")) + before_pk = ( + ProjectReview.objects.filter(project=project) + .order_by("-pk") + .values_list("pk", flat=True) + .first() + ) + + response = original_review_post(self, request, *args, **kwargs) + + project_review = ( + ProjectReview.objects.filter(project=project) + .select_related("status", "project") + .order_by("-pk") + .first() + ) + if project_review and project_review.pk != before_pk: + log_event( + AuditEvent.Action.PROJECT_REVIEW_SUBMITTED, + project_review, + new_values=_review_values(project_review), + message=( + f"Project review submitted for {project_label(project_review.project)} " + f"with status {_status_name(project_review)}" + ), + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.COLDFRONT_WORKFLOW, + target_repr=project_review_label(project_review), + ) + + return response + + def complete_get(self, request, project_review_pk): + project_review = get_object_or_404( + ProjectReview.objects.select_related("status", "project"), + pk=project_review_pk, + ) + old_status = _status_name(project_review) + suppressed_ids = _suppressed_project_review_ids() + suppressed_ids.add(project_review_pk) + try: + response = original_complete_get(self, request, project_review_pk) + finally: + suppressed_ids.discard(project_review_pk) + + project_review.refresh_from_db() + project_review = ProjectReview.objects.select_related("status", "project").get(pk=project_review.pk) + new_status = _status_name(project_review) + if old_status != new_status and new_status == "Completed": + log_event( + AuditEvent.Action.PROJECT_REVIEW_COMPLETED, + project_review, + old_values={"status": old_status, "project_id": project_review.project_id}, + new_values={"status": new_status, "project_id": project_review.project_id}, + message=f"Project review completed for {project_label(project_review.project)}", + actor=getattr(request, "user", None), + request=request, + source=AuditEvent.Source.COLDFRONT_WORKFLOW, + target_repr=project_review_label(project_review), + ) + + return response + + project_views.ProjectReviewView.post = review_post + project_views.ProjectReviewCompleteView.get = complete_get + setattr(project_views, PATCH_MARKER, True) diff --git a/resolvers.py b/resolvers.py new file mode 100644 index 0000000..985fbf1 --- /dev/null +++ b/resolvers.py @@ -0,0 +1,174 @@ +from django.contrib.auth import get_user_model + + +def _get_model(app_label, model_name): + from django.apps import apps + + return apps.get_model(app_label, model_name) + + +def _object_label(app_label, model_name, pk, formatter=None): + if pk in (None, ""): + return None + try: + obj = _get_model(app_label, model_name).objects.get(pk=pk) + except Exception: + return None + if formatter is not None: + return formatter(obj) + return str(obj) + + +def user_label(user): + full_name = user.get_full_name() + if full_name: + return f"{user.username} ({full_name})" + return user.username + + +def user_profile_label(user_profile): + try: + return user_label(user_profile.user) + except Exception: + return f"UserProfile id {user_profile.pk} (user not found)" + + +def user_profile_label_from_id(user_profile_id): + try: + user_profile = _get_model("user", "UserProfile").objects.select_related("user").get(pk=user_profile_id) + except Exception: + return f"UserProfile id {user_profile_id} (user not found)" + return user_profile_label(user_profile) + + +def _resource_allocation_prefix(resource_names): + names = [name for name in resource_names if name] + if not names: + return "Allocation" + if len(names) <= 2: + return f"{', '.join(names)} allocation" + return f"{', '.join(names[:2])} + {len(names) - 2} more allocation" + + +def allocation_label_from_parts(resource_names, project_reference): + return f"{_resource_allocation_prefix(resource_names)} for {project_reference}" + + +def project_label_from_id(project_id): + if project_id in (None, ""): + return "Project unknown" + try: + return project_label(_get_model("project", "Project").objects.get(pk=project_id)) + except Exception: + return f"Project id {project_id} (not found)" + + +def allocation_label(allocation): + resources = list(allocation.resources.values_list("name", flat=True).order_by("name")) + try: + project_reference = project_label(allocation.project) + except Exception: + project_reference = project_label_from_id(getattr(allocation, "project_id", None)) + return allocation_label_from_parts(resources, project_reference) + + +def allocation_change_label(allocation_change): + return f"change request for {allocation_label(allocation_change.allocation)}" + + +def project_user_label(project_user): + return f"{project_user.user.username} on {project_label(project_user.project)}" + + +def project_review_label(project_review): + return f"review for {project_label(project_review.project)}" + + +def project_label(project): + label = f"Project: {project.title}" + if getattr(project, "project_code", ""): + label = f"{label} (code {project.project_code})" + return label + + +def resolve_target_label(target_type, target_id): + formatter_map = { + "allocation.allocation": lambda obj: allocation_label(obj), + "allocation.allocationchangerequest": lambda obj: allocation_change_label(obj), + "auth.user": lambda obj: user_label(obj), + "user.userprofile": lambda obj: user_profile_label(obj), + "project.project": lambda obj: project_label(obj), + "project.projectuser": lambda obj: project_user_label(obj), + "project.projectreview": lambda obj: project_review_label(obj), + "resource.resource": lambda obj: obj.name, + } + if "." not in target_type: + return None + app_label, model_name = target_type.split(".", 1) + if target_type == "project.project": + return project_label_from_id(target_id) + return _object_label(app_label, model_name, target_id, formatter_map.get(target_type)) + + +def _resolve_id(key, value): + if key in {"project_id", "project"}: + label = _object_label("project", "Project", value, lambda obj: project_label(obj)) + elif key in {"user_id", "user"}: + label = _object_label("auth", "User", value, lambda obj: user_label(obj)) + elif key in {"allocation_id", "allocation"}: + label = _object_label("allocation", "Allocation", value, lambda obj: allocation_label(obj)) + elif key in {"allocation_change_request_id", "allocation_change_request"}: + label = _object_label( + "allocation", + "AllocationChangeRequest", + value, + lambda obj: allocation_change_label(obj), + ) + elif key in {"resource_type_id", "resource_type"}: + label = _object_label("resource", "ResourceType", value) + elif key in {"parent_resource_id", "parent_resource"}: + label = _object_label("resource", "Resource", value, lambda obj: obj.name) + else: + label = None + + if not label: + return value + return f"{label} (id {value})" + + +def _resolve_id_list(key, values): + if key == "resource_ids": + labels = [] + for value in values: + label = _object_label("resource", "Resource", value, lambda obj: obj.name) + labels.append(f"{label} (id {value})" if label else value) + return labels + return values + + +def resolve_value_labels(values): + if not isinstance(values, dict): + return values + + resolved = {} + for key, value in values.items(): + if isinstance(value, list): + resolved[key] = _resolve_id_list(key, value) + elif key.endswith("_id") or key in {"project", "user", "allocation", "resource_type", "parent_resource"}: + resolved[key] = _resolve_id(key, value) + else: + resolved[key] = value + return resolved + + +def actor_label(actor): + if actor is None: + return "programmatic" + return user_label(actor) + + +def username_from_id(user_id): + try: + return user_label(get_user_model().objects.get(pk=user_id)) + except Exception: + return str(user_id) diff --git a/signals.py b/signals.py new file mode 100644 index 0000000..43d9c02 --- /dev/null +++ b/signals.py @@ -0,0 +1,538 @@ +import datetime + +from django.contrib.auth import get_user_model +from django.db.models.signals import post_delete, post_save, pre_save +from django.dispatch import receiver + +from coldfront.core.allocation.models import Allocation, AllocationChangeRequest +from coldfront.core.allocation.signals import ( + allocation_activate, + allocation_change_approved, + allocation_change_created, + allocation_disable, + allocation_new, +) +from coldfront.core.project.models import Project, ProjectReview, ProjectUser +from coldfront.core.project.signals import project_activate_user, project_archive, project_remove_user +from coldfront.core.resource.models import Resource +from coldfront.core.user.models import UserProfile + +from .models import AuditEvent +from .resolvers import allocation_label, project_label, project_review_label, project_user_label, user_label +from .utils import log_event +from .project_review_audit import is_project_review_status_suppressed + + +User = get_user_model() + + +def _choice_name(choice): + return getattr(choice, "name", None) + + +def _date_value(value): + return value.isoformat() if value else None + + +def _user_label(user): + try: + return user_label(user) + except Exception: + return getattr(user, "username", str(user)) + + +def _project_user_summary(project_user): + return project_user_label(project_user) + + +def _allocation_summary(allocation): + return allocation_label(allocation) + + +def _project_review_summary(project_review): + return project_review_label(project_review) + + +def _project_values(project): + return { + "title": project.title, + "status": _choice_name(project.status), + "pi_id": project.pi_id, + "project_code": project.project_code, + } + + +def _cache_old_instance(instance, attr_name): + if not instance.pk: + setattr(instance, attr_name, None) + return + + try: + old_instance = instance.__class__.objects.get(pk=instance.pk) + except instance.__class__.DoesNotExist: + old_instance = None + + setattr(instance, attr_name, old_instance) + + +@receiver(pre_save, sender=UserProfile, dispatch_uid="carc_audit_user_profile_pre_save") +def cache_user_profile(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=UserProfile, dispatch_uid="carc_audit_user_profile_post_save") +def audit_user_profile_pi_status(sender, instance, created, **kwargs): + if created: + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None or old_instance.is_pi == instance.is_pi: + return + + if instance.is_pi: + action = AuditEvent.Action.USER_PI_UPGRADED + message = f"PI status changed for {_user_label(instance.user)}: upgraded to PI" + else: + action = AuditEvent.Action.PI_STATUS_REVOKED + message = f"PI status changed for {_user_label(instance.user)}: PI status revoked" + + log_event( + action, + instance.user, + old_values={"is_pi": old_instance.is_pi}, + new_values={"is_pi": instance.is_pi}, + message=message, + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_user_label(instance.user), + ) + + +@receiver(pre_save, sender=User, dispatch_uid="carc_audit_user_pre_save") +def cache_user(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=User, dispatch_uid="carc_audit_user_post_save") +def audit_user_admin_privileges(sender, instance, created, **kwargs): + if created: + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None: + return + + old_values = { + "is_staff": old_instance.is_staff, + "is_superuser": old_instance.is_superuser, + } + new_values = { + "is_staff": instance.is_staff, + "is_superuser": instance.is_superuser, + } + if old_values == new_values: + return + + log_event( + AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED, + instance, + old_values=old_values, + new_values=new_values, + message=( + f"Admin privileges changed for {instance.username}: " + f"is_staff {old_instance.is_staff} -> {instance.is_staff}; " + f"is_superuser {old_instance.is_superuser} -> {instance.is_superuser}" + ), + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=instance.username, + ) + + +@receiver(allocation_new, dispatch_uid="carc_audit_allocation_new") +def audit_allocation_requested(sender, allocation_pk, **kwargs): + allocation = Allocation.objects.select_related("project", "status").get(pk=allocation_pk) + log_event( + AuditEvent.Action.ALLOCATION_REQUESTED, + allocation, + new_values={ + "status": _choice_name(allocation.status), + "start_date": _date_value(allocation.start_date), + "end_date": _date_value(allocation.end_date), + "project_id": allocation.project_id, + "resource_ids": list(allocation.resources.values_list("pk", flat=True)), + }, + message=f"Allocation requested for {project_label(allocation.project)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation), + ) + + +@receiver(allocation_activate, dispatch_uid="carc_audit_allocation_activate") +def audit_allocation_activated(sender, allocation_pk, **kwargs): + allocation = Allocation.objects.select_related("status").get(pk=allocation_pk) + log_event( + AuditEvent.Action.ALLOCATION_STATUS_CHANGED, + allocation, + new_values={"status": _choice_name(allocation.status)}, + message=f"Allocation approved for {project_label(allocation.project)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation), + ) + + +@receiver(allocation_disable, dispatch_uid="carc_audit_allocation_disable") +def audit_allocation_disabled(sender, allocation_pk, **kwargs): + allocation = Allocation.objects.select_related("status").get(pk=allocation_pk) + status_name = _choice_name(allocation.status) + # allocation_disable only carries allocation_pk. The sending view can know + # whether the user clicked deny or selected revoke, but that context is not + # included in the signal, so record the resulting status without guessing. + log_event( + AuditEvent.Action.ALLOCATION_DISABLED, + allocation, + new_values={"status": status_name}, + message=f"Allocation disabled for {project_label(allocation.project)}; resulting status: {status_name}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation), + ) + + +@receiver(allocation_change_created, dispatch_uid="carc_audit_allocation_change_created") +def audit_allocation_change_created(sender, allocation_pk, allocation_change_pk, **kwargs): + allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get( + pk=allocation_change_pk + ) + action = AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED + allocation_text = _allocation_summary(allocation_change.allocation) + message = f"Allocation change requested for {allocation_text}" + if allocation_change.end_date_extension: + action = AuditEvent.Action.RENEWAL_REQUESTED + message = ( + f"Renewal requested for {allocation_text}: " + f"{allocation_change.end_date_extension} days" + ) + + log_event( + action, + allocation_change.allocation, + new_values={ + "allocation_change_request_id": allocation_change.pk, + "status": _choice_name(allocation_change.status), + "end_date_extension": allocation_change.end_date_extension, + }, + message=message, + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation_change.allocation), + ) + + +@receiver(allocation_change_approved, dispatch_uid="carc_audit_allocation_change_approved") +def audit_allocation_change_approved(sender, allocation_pk, allocation_change_pk, **kwargs): + allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get( + pk=allocation_change_pk + ) + if not allocation_change.end_date_extension: + return + + allocation_text = _allocation_summary(allocation_change.allocation) + values = { + "allocation_change_request_id": allocation_change.pk, + "status": _choice_name(allocation_change.status), + "end_date_extension": allocation_change.end_date_extension, + "allocation_end_date": _date_value(allocation_change.allocation.end_date), + } + previous_end_date = None + if allocation_change.allocation.end_date and allocation_change.end_date_extension: + previous_end_date = allocation_change.allocation.end_date - datetime.timedelta( + days=allocation_change.end_date_extension + ) + log_event( + AuditEvent.Action.RENEWAL_APPROVED, + allocation_change.allocation, + new_values=values, + message=( + f"Renewal approved for {allocation_text}, " + f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}" + ), + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation_change.allocation), + ) + log_event( + AuditEvent.Action.ALLOCATION_RENEWED, + allocation_change.allocation, + new_values=values, + message=( + f"{allocation_text} renewed, " + f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}" + ), + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_allocation_summary(allocation_change.allocation), + ) + + +@receiver(project_activate_user, dispatch_uid="carc_audit_project_activate_user") +def audit_project_user_added(sender, project_user_pk, **kwargs): + project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk) + # Project.add_user() uses update_or_create(), but project_activate_user only + # provides project_user_pk. Without created/old-state metadata, this signal + # can mean a new membership, reactivation, role update, or status update. + log_event( + AuditEvent.Action.PROJECT_USER_ADDED, + project_user, + new_values={ + "project_id": project_user.project_id, + "user_id": project_user.user_id, + "role": _choice_name(project_user.role), + "status": _choice_name(project_user.status), + }, + message=( + f"User {project_user.user.username} added/activated on " + f"{project_label(project_user.project)} as {project_user.role.name}" + ), + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_project_user_summary(project_user), + ) + + +@receiver(project_remove_user, dispatch_uid="carc_audit_project_remove_user") +def audit_project_user_removed(sender, project_user_pk, **kwargs): + project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk) + log_event( + AuditEvent.Action.PROJECT_USER_REMOVED, + project_user, + old_values={ + "project_id": project_user.project_id, + "user_id": project_user.user_id, + "role": _choice_name(project_user.role), + "status": _choice_name(project_user.status), + }, + message=f"User {project_user.user.username} removed from {project_label(project_user.project)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_project_user_summary(project_user), + ) + + +@receiver(pre_save, sender=Project, dispatch_uid="carc_audit_project_pre_save") +def cache_project(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=Project, dispatch_uid="carc_audit_project_post_save") +def audit_project_save(sender, instance, created, **kwargs): + if created: + log_event( + AuditEvent.Action.PROJECT_CREATED, + instance, + new_values=_project_values(instance), + message=f"Project created: {project_label(instance)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(instance), + ) + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None: + return + + old_status = _choice_name(old_instance.status) + new_status = _choice_name(instance.status) + if old_status != new_status: + action = ( + AuditEvent.Action.PROJECT_ARCHIVED + if new_status == "Archived" + else AuditEvent.Action.PROJECT_STATUS_CHANGED + ) + log_event( + action, + instance, + old_values={"status": old_status, "pi_id": old_instance.pi_id}, + new_values={"status": new_status, "pi_id": instance.pi_id}, + message=f"Project status changed for {project_label(instance)}: {old_status} -> {new_status}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(instance), + ) + + if old_instance.pi_id != instance.pi_id: + log_event( + AuditEvent.Action.PROJECT_PI_CHANGED, + instance, + old_values={"pi_id": old_instance.pi_id}, + new_values={"pi_id": instance.pi_id}, + message=f"Project PI changed for {project_label(instance)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(instance), + ) + + if created: + return + + if old_instance.force_review or not instance.force_review: + return + + log_event( + AuditEvent.Action.PROJECT_REVIEW_FORCED, + instance, + old_values={ + "force_review": old_instance.force_review, + "requires_review": old_instance.requires_review, + }, + new_values={ + "force_review": instance.force_review, + "requires_review": instance.requires_review, + }, + message=f"Project review forced for {project_label(instance)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(instance), + ) + + +@receiver(project_archive, dispatch_uid="carc_audit_project_archive") +def audit_project_archived_signal(sender, project_obj, **kwargs): + if AuditEvent.objects.filter( + action=AuditEvent.Action.PROJECT_ARCHIVED, + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_type="project.project", + target_id=str(project_obj.pk), + ).exists(): + return + + log_event( + AuditEvent.Action.PROJECT_ARCHIVED, + project_obj, + new_values=_project_values(project_obj), + message=f"Project archived: {project_label(project_obj)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(project_obj), + ) + + +@receiver(post_delete, sender=Project, dispatch_uid="carc_audit_project_post_delete") +def audit_project_delete(sender, instance, **kwargs): + log_event( + AuditEvent.Action.PROJECT_DELETED, + instance, + old_values=_project_values(instance), + message=f"Project deleted: {project_label(instance)}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=project_label(instance), + ) + + +@receiver(pre_save, sender=ProjectReview, dispatch_uid="carc_audit_project_review_pre_save") +def cache_project_review(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=ProjectReview, dispatch_uid="carc_audit_project_review_post_save") +def audit_project_review_status_changed(sender, instance, created, **kwargs): + if created or is_project_review_status_suppressed(instance.pk): + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None: + return + + old_status = _choice_name(old_instance.status) + new_status = _choice_name(instance.status) + if old_status == new_status: + return + + log_event( + AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED, + instance, + old_values={"status": old_status, "project_id": instance.project_id}, + new_values={"status": new_status, "project_id": instance.project_id}, + message=f"Project review status changed for {project_label(instance.project)}: {old_status} -> {new_status}", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_project_review_summary(instance), + ) + + +@receiver(pre_save, sender=ProjectUser, dispatch_uid="carc_audit_project_user_role_pre_save") +def cache_project_user(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=ProjectUser, dispatch_uid="carc_audit_project_user_role_post_save") +def audit_project_user_role_change(sender, instance, created, **kwargs): + if created: + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None: + return + + old_role = _choice_name(old_instance.role) + new_role = _choice_name(instance.role) + if old_role == new_role: + return + + log_event( + AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, + instance, + old_values={"role": old_role}, + new_values={"role": new_role}, + message=( + f"Project user role changed for {instance.user.username} on " + f"{project_label(instance.project)}: {old_role} -> {new_role}" + ), + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=_project_user_summary(instance), + ) + + +@receiver(pre_save, sender=Resource, dispatch_uid="carc_audit_resource_pre_save") +def cache_resource(sender, instance, **kwargs): + _cache_old_instance(instance, "_carc_audit_old") + + +@receiver(post_save, sender=Resource, dispatch_uid="carc_audit_resource_post_save") +def audit_resource_save(sender, instance, created, **kwargs): + if created: + log_event( + AuditEvent.Action.RESOURCE_CREATED, + instance, + new_values={"name": instance.name, "resource_type_id": instance.resource_type_id}, + message=f"Resource {instance.name} created", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=instance.name, + ) + return + + old_instance = getattr(instance, "_carc_audit_old", None) + if old_instance is None: + return + + tracked_fields = ("name", "description", "is_available", "is_public", "is_allocatable", "requires_payment") + old_values = {field: getattr(old_instance, field) for field in tracked_fields} + new_values = {field: getattr(instance, field) for field in tracked_fields} + old_values["resource_type_id"] = old_instance.resource_type_id + new_values["resource_type_id"] = instance.resource_type_id + old_values["parent_resource_id"] = old_instance.parent_resource_id + new_values["parent_resource_id"] = instance.parent_resource_id + + changed_old = {key: value for key, value in old_values.items() if value != new_values[key]} + changed_new = {key: value for key, value in new_values.items() if old_values[key] != value} + if not changed_old: + return + + log_event( + AuditEvent.Action.RESOURCE_CHANGED, + instance, + old_values=changed_old, + new_values=changed_new, + message=f"Resource {instance.name} changed", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=instance.name, + ) + + +@receiver(post_delete, sender=Resource, dispatch_uid="carc_audit_resource_post_delete") +def audit_resource_delete(sender, instance, **kwargs): + log_event( + AuditEvent.Action.RESOURCE_DELETED, + instance, + old_values={"name": instance.name, "resource_type_id": instance.resource_type_id}, + message=f"Resource {instance.name} deleted", + source=AuditEvent.Source.COLDFRONT_SIGNAL, + target_repr=instance.name, + ) diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..860609a --- /dev/null +++ b/tests.py @@ -0,0 +1,574 @@ +from io import StringIO +from types import SimpleNamespace + +from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry +from django.contrib.auth import get_user_model +from django.contrib.contenttypes.models import ContentType +from django.core.management import call_command +from django.test import TestCase +from django.utils.html import strip_tags +from django.utils import timezone + +from coldfront.core.allocation.models import Allocation, AllocationStatusChoice +from coldfront.core.field_of_science.models import FieldOfScience +from coldfront.core.project.models import Project, ProjectStatusChoice +from coldfront.core.resource.models import Resource, ResourceType +from coldfront.core.user.models import UserProfile + +from .backfill import RUN_NAME, _allocation_repr, _project_user_repr +from .models import AuditEvent, BackfillRun +from .resolvers import allocation_label + + +class BackfillCmmcAuditCommandTests(TestCase): + def setUp(self): + self.user = get_user_model().objects.create_user( + username="audit_backfill_admin", + email="audit_backfill_admin@example.test", + password="unused", + is_staff=True, + ) + self.content_type = ContentType.objects.get_for_model(get_user_model()) + + def _log_entry(self, action_flag, object_id, object_repr, change_message="", content_type=None): + return LogEntry.objects.create( + user=self.user, + content_type=content_type or self.content_type, + object_id=str(object_id), + object_repr=object_repr, + action_flag=action_flag, + change_message=change_message, + ) + + def _run_command(self, *args): + out = StringIO() + call_command("backfill_cmmc_audit", *args, stdout=out) + return out.getvalue() + + def test_dry_run_creates_no_rows(self): + self._log_entry(ADDITION, self.user.pk, self.user.username) + + output = self._run_command("--dry-run") + + self.assertIn("Mode: dry-run", output) + self.assertEqual(AuditEvent.objects.count(), 0) + self.assertEqual(BackfillRun.objects.count(), 0) + + def test_commit_creates_reconstructed_rows_from_admin_log(self): + log_entry = self._log_entry(ADDITION, self.user.pk, self.user.username) + + self._run_command("--commit") + + event = AuditEvent.objects.get() + self.assertEqual(event.action, AuditEvent.Action.ADMIN_ADDITION) + self.assertEqual(event.source, AuditEvent.Source.DJANGO_ADMIN_LOG) + self.assertEqual(event.source_id, str(log_entry.pk)) + self.assertEqual(event.event_time, log_entry.action_time) + self.assertTrue(event.is_reconstructed) + self.assertEqual(BackfillRun.objects.get(name=RUN_NAME).created_events, 1) + + def test_second_commit_without_force_does_not_rerun(self): + self._log_entry(ADDITION, self.user.pk, self.user.username) + self._run_command("--commit") + + output = self._run_command("--commit") + + self.assertIn("use --force to rerun", output) + self.assertEqual(AuditEvent.objects.count(), 1) + self.assertEqual(BackfillRun.objects.count(), 1) + + def test_force_rerun_does_not_duplicate_rows(self): + self._log_entry(ADDITION, self.user.pk, self.user.username) + self._run_command("--commit") + + output = self._run_command("--commit", "--force") + + self.assertIn("Duplicates skipped: 1", output) + self.assertEqual(AuditEvent.objects.count(), 1) + self.assertEqual(BackfillRun.objects.count(), 1) + + def test_admin_add_change_delete_mapping(self): + target = get_user_model().objects.create_user(username="mapped_user") + self._log_entry(ADDITION, target.pk, target.username) + self._log_entry(CHANGE, target.pk, target.username, '[{"changed": {"fields": ["Email"]}}]') + self._log_entry(DELETION, target.pk, target.username) + + self._run_command("--commit") + + self.assertEqual( + set(AuditEvent.objects.values_list("action", flat=True)), + { + AuditEvent.Action.ADMIN_ADDITION, + AuditEvent.Action.ADMIN_CHANGE, + AuditEvent.Action.ADMIN_DELETION, + }, + ) + + def test_duplicate_source_id_action_is_skipped(self): + log_entry = self._log_entry(CHANGE, self.user.pk, self.user.username) + AuditEvent.objects.create( + action=AuditEvent.Action.ADMIN_CHANGE, + actor=self.user, + target_type="auth.user", + target_id=str(self.user.pk), + target_repr=self.user.username, + source=AuditEvent.Source.DJANGO_ADMIN_LOG, + source_id=str(log_entry.pk), + event_time=timezone.now(), + is_reconstructed=True, + ) + + output = self._run_command("--dry-run") + + self.assertIn("Duplicates skipped: 1", output) + self.assertEqual(AuditEvent.objects.count(), 1) + + def test_project_deletion_from_admin_log_backfills_project_deleted(self): + project_content_type = ContentType.objects.get_for_model(Project) + self._log_entry( + DELETION, + 2016540, + "Quantum Materials Access Review", + "", + content_type=project_content_type, + ) + + self._run_command("--commit") + + event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_DELETED) + self.assertEqual(event.target_type, "project.project") + self.assertEqual(event.target_id, "2016540") + self.assertEqual( + event.target_repr, + "Deleted project: Quantum Materials Access Review (id 2016540)", + ) + self.assertEqual(event.target_summary, event.target_repr) + + def test_live_user_profile_pi_event_display_uses_user_label(self): + target = get_user_model().objects.create_user( + username="live_pi", + first_name="Live", + last_name="Person", + ) + target.userprofile.is_pi = True + + target.userprofile.save() + + event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) + self.assertEqual(event.target_type, "auth.user") + self.assertEqual(event.target_id, str(target.pk)) + self.assertEqual(event.target_repr, "live_pi (Live Person)") + self.assertIn("PI status changed for live_pi (Live Person): upgraded to PI", event.message) + + def test_reconstructed_user_profile_logentry_display_uses_related_user_label(self): + target = get_user_model().objects.create_user( + username="profile_pi", + first_name="Profile", + last_name="Person", + ) + profile_content_type = ContentType.objects.get_for_model(UserProfile) + self._log_entry( + CHANGE, + target.userprofile.pk, + f"UserProfile object ({target.userprofile.pk})", + '[{"changed": {"fields": ["Is pi"]}}]', + content_type=profile_content_type, + ) + + self._run_command("--commit") + + event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) + self.assertEqual(event.target_type, "user.userprofile") + self.assertEqual(event.target_id, str(target.userprofile.pk)) + self.assertEqual(event.target_repr, "profile_pi (Profile Person)") + self.assertEqual( + event.message, + "Reconstructed from Django admin log: PI status changed for " + "profile_pi (Profile Person); direction unknown", + ) + + def test_reconstructed_user_profile_logentry_uses_missing_profile_fallback_label(self): + profile_content_type = ContentType.objects.get_for_model(UserProfile) + self._log_entry( + CHANGE, + 999999, + "UserProfile object (999999)", + '[{"changed": {"fields": ["Is pi"]}}]', + content_type=profile_content_type, + ) + + self._run_command("--commit") + + event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) + self.assertEqual(event.target_repr, "UserProfile id 999999 (user not found)") + self.assertEqual( + event.message, + "Reconstructed from Django admin log: PI status changed for " + "UserProfile id 999999 (user not found); direction unknown", + ) + + def test_one_resource_allocation_label(self): + project = self._project(title="Quantum Materials Access Review") + allocation = self._allocation(project=project) + self._resource("CARC Project Scratch", allocation=allocation) + + label = allocation_label(allocation) + + self.assertEqual(label, f"CARC Project Scratch allocation for Project: {project.title}") + + def test_storage_allocation_label(self): + project = self._project(title="Viral sequencing") + allocation = self._allocation(project=project) + self._resource("CARC Project Storage", allocation=allocation) + + label = allocation_label(allocation) + + self.assertEqual(label, f"CARC Project Storage allocation for Project: {project.title}") + + def test_allocation_label_omits_resource_when_missing(self): + project = self._project(title="Quantum Materials Access Review") + allocation = self._allocation(project=project) + + label = allocation_label(allocation) + + self.assertEqual(label, f"Allocation for Project: {project.title}") + + def test_allocation_target_summary_uses_label_without_allocation_id(self): + project = self._project(title="Quantum Materials Access Review") + allocation = self._allocation(project=project) + self._resource("CARC Project Scratch", allocation=allocation) + event = AuditEvent.objects.create( + action=AuditEvent.Action.ALLOCATION_STATUS_CHANGED, + actor=self.user, + target_type="allocation.allocation", + target_id=str(allocation.pk), + target_repr=f"Allocation {allocation.pk}, stale label", + source=AuditEvent.Source.COLDFRONT_HISTORY, + event_time=timezone.now(), + ) + + self.assertEqual(event.target_summary, f"CARC Project Scratch allocation for Project: {project.title}") + self.assertNotIn(str(allocation.pk), event.target_summary) + + def test_historical_allocation_label_resolves_project_id_without_allocation_id(self): + project = self._project(title="Quantum Materials Access Review") + history = SimpleNamespace(id=1471, project_id=project.pk) + + label = _allocation_repr(history) + + self.assertEqual(label, f"Allocation for Project: {project.title}") + + def test_historical_allocation_label_marks_missing_project_id_with_resource_name(self): + history = SimpleNamespace(id=1471, project_id=2016541, resource_names=["CARC Project Scratch"]) + + label = _allocation_repr(history) + + self.assertEqual(label, "CARC Project Scratch allocation for Project id 2016541 (not found)") + + def test_historical_allocation_label_marks_missing_project_id(self): + history = SimpleNamespace(id=1471, project_id=2016541) + + label = _allocation_repr(history) + + self.assertEqual(label, "Allocation for Project id 2016541 (not found)") + + def test_historical_project_user_label_resolves_project_id(self): + project = self._project(title="Quantum Materials Access Review") + history = SimpleNamespace(id=2001, project_id=project.pk, user_id=self.user.pk) + + label = _project_user_repr(history) + + self.assertIn(f"on Project: {project.title}", label) + self.assertIn(f"{self.user.username} (id {self.user.pk})", label) + + def test_historical_project_user_label_marks_missing_project_id(self): + history = SimpleNamespace(id=2001, project_id=2016541, user_id=self.user.pk) + + label = _project_user_repr(history) + + self.assertIn("Project id 2016541 (not found)", label) + self.assertIn(f"{self.user.username} (id {self.user.pk})", label) + + def _project(self, title): + status = ProjectStatusChoice.objects.create(name=f"Active {title}") + field_of_science = FieldOfScience.objects.create(description=f"FOS {title}") + return Project.objects.create( + title=title, + pi=self.user, + description="A sufficiently detailed project description.", + field_of_science=field_of_science, + status=status, + ) + + def _allocation(self, project): + status = AllocationStatusChoice.objects.create(name=f"Active {project.title}") + return Allocation.objects.create( + project=project, + status=status, + justification="Testing audit allocation labels.", + ) + + def _resource(self, name, allocation): + resource_type, _ = ResourceType.objects.get_or_create( + name="Storage", + defaults={"description": "Storage resource type"}, + ) + resource = Resource.objects.create( + name=name, + description="Testing audit allocation labels.", + resource_type=resource_type, + ) + allocation.resources.add(resource) + return resource + + +class AuditEventProjectDisplayTests(TestCase): + def test_missing_project_id_display_is_explicit(self): + event = AuditEvent.objects.create( + action=AuditEvent.Action.PROJECT_DELETED, + target_type="project.project", + target_id="2016540", + event_time=timezone.now(), + ) + + self.assertEqual(event.target_summary, "Project id 2016540 (not found)") + + def test_missing_deleted_project_uses_stored_target_repr(self): + event = AuditEvent.objects.create( + action=AuditEvent.Action.PROJECT_DELETED, + target_type="project.project", + target_id="2016540", + target_repr="Deleted project: Quantum Materials Access Review (id 2016540)", + event_time=timezone.now(), + ) + + self.assertEqual(event.target_summary, "Deleted project: Quantum Materials Access Review (id 2016540)") + + def test_project_archive_status_transition_creates_readable_event(self): + user = get_user_model().objects.create_user(username="project_archive_pi") + project = self._project(user, "Quantum Materials Access Review", "New") + archived = ProjectStatusChoice.objects.create(name="Archived") + AuditEvent.objects.all().delete() + + project.status = archived + project.save() + + event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_ARCHIVED) + self.assertEqual(event.target_repr, f"Project: {project.title}") + self.assertEqual(event.old_values["status"], "New") + self.assertEqual(event.new_values["status"], "Archived") + self.assertEqual( + event.message, + f"Project status changed for Project: {project.title}: New -> Archived", + ) + + def test_project_pi_change_creates_readable_event(self): + old_pi = get_user_model().objects.create_user(username="old_project_pi") + new_pi = get_user_model().objects.create_user(username="new_project_pi") + project = self._project(old_pi, "Quantum Materials Access Review", "Active") + AuditEvent.objects.all().delete() + + project.pi = new_pi + project.save() + + event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_PI_CHANGED) + self.assertEqual(event.target_repr, f"Project: {project.title}") + self.assertEqual(event.old_values["pi_id"], old_pi.pk) + self.assertEqual(event.new_values["pi_id"], new_pi.pk) + self.assertEqual(event.message, f"Project PI changed for Project: {project.title}") + + def _project(self, user, title, status_name): + status = ProjectStatusChoice.objects.create(name=f"{status_name} {title}") + status.name = status_name + status.save(update_fields=["name"]) + field_of_science = FieldOfScience.objects.create(description=f"FOS {title} {status_name}") + return Project.objects.create( + title=title, + pi=user, + description="A sufficiently detailed project description.", + field_of_science=field_of_science, + status=status, + ) + + +class AuditEventAllocationDisplayTests(TestCase): + def test_new_to_active_reads_as_approved(self): + event = self._event( + AuditEvent.Action.ALLOCATION_STATUS_CHANGED, + old_values={"status": "New"}, + new_values={"status": "Active"}, + ) + + self.assertEqual(event.transition_display, "New \u2192 Active") + self.assertEqual(event.action_detail_display, "Allocation approved (New \u2192 Active)") + + def test_active_to_revoked_reads_as_revoked(self): + event = self._event( + AuditEvent.Action.ALLOCATION_DISABLED, + old_values={"status": "Active"}, + new_values={"status": "Revoked"}, + ) + + self.assertEqual(event.transition_display, "Active \u2192 Revoked") + self.assertEqual(event.action_detail_display, "Allocation revoked (Active \u2192 Revoked)") + + def test_missing_old_new_status_falls_back_to_action_label(self): + event = self._event(AuditEvent.Action.ALLOCATION_STATUS_CHANGED) + + self.assertEqual(event.transition_display, "") + self.assertEqual(event.action_detail_display, "Allocation status changed") + + def test_reconstructed_row_uses_status_transition(self): + event = self._event( + AuditEvent.Action.ALLOCATION_STATUS_CHANGED, + old_values={"status": "Pending"}, + new_values={"status": "Active"}, + source=AuditEvent.Source.COLDFRONT_HISTORY, + is_reconstructed=True, + ) + + self.assertEqual(event.transition_display, "Pending \u2192 Active") + self.assertEqual(event.action_detail_display, "Allocation status changed (Pending \u2192 Active)") + + def test_live_row_uses_status_transition(self): + event = self._event( + AuditEvent.Action.ALLOCATION_DISABLED, + old_values={"status": "New"}, + new_values={"status": "Denied"}, + source=AuditEvent.Source.COLDFRONT_SIGNAL, + is_reconstructed=False, + ) + + self.assertEqual(event.transition_display, "New \u2192 Denied") + self.assertEqual(event.action_detail_display, "Allocation disabled (New \u2192 Denied)") + + def test_allocation_renewed_uses_extension_days(self): + event = self._event( + AuditEvent.Action.ALLOCATION_RENEWED, + new_values={"end_date_extension": 365}, + ) + + self.assertEqual(event.transition_display, "+365 days") + self.assertEqual(event.action_detail_display, "Allocation renewed (+365 days)") + + def _event(self, action, **kwargs): + defaults = { + "target_type": "allocation.allocation", + "target_id": "1", + "target_repr": "Allocation for Project: Example", + "event_time": timezone.now(), + "source": AuditEvent.Source.RUNTIME, + } + defaults.update(kwargs) + return AuditEvent.objects.create(action=action, **defaults) + + +class AuditEventEvidenceDisplayTests(TestCase): + def test_ac_au_coldfront_history_display(self): + event = self._event( + control_family="AC,AU", + evidence_category=AuditEvent.EvidenceCategory.AC_AU, + source=AuditEvent.Source.COLDFRONT_HISTORY, + ) + + self.assertEqual(event.evidence_area_display, "Access Control + Audit Logging") + self.assertEqual(event.source_display, "ColdFront history table") + self.assertEqual(event.evidence_display, "Access Control + Audit Logging\nSource: ColdFront history table") + self.assertIn("Access Control + Audit Logging", self._admin_evidence_text(event)) + self.assertIn("Source: ColdFront history table", self._admin_evidence_text(event)) + + def test_cm_django_admin_log_display(self): + event = self._event( + control_family="CM", + evidence_category=AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, + source=AuditEvent.Source.DJANGO_ADMIN_LOG, + ) + + self.assertEqual(event.evidence_area_display, "Configuration Management") + self.assertEqual(event.source_display, "Django admin log") + self.assertEqual(event.evidence_display, "Configuration Management\nSource: Django admin log") + self.assertIn("Configuration Management", self._admin_evidence_text(event)) + self.assertIn("Source: Django admin log", self._admin_evidence_text(event)) + + def test_unknown_source_falls_back_clearly(self): + event = self._event( + control_family="AC", + evidence_category=AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + source="unexpected_source", + ) + + self.assertEqual(event.source_display, "Unknown source (unexpected_source)") + self.assertIn("Unknown source (unexpected_source)", self._admin_evidence_text(event)) + + def _event(self, **kwargs): + defaults = { + "action": AuditEvent.Action.ADMIN_CHANGE, + "target_type": "auth.user", + "target_id": "1", + "target_repr": "Example user", + "event_time": timezone.now(), + } + defaults.update(kwargs) + return AuditEvent.objects.create(**defaults) + + def _admin_evidence_text(self, event): + from .admin import AuditEventAdmin + + admin_instance = AuditEventAdmin(AuditEvent, None) + return strip_tags(str(admin_instance.evidence_display(event))) + + +class ActorTypeFilterTests(TestCase): + def setUp(self): + self.user = get_user_model().objects.create_user(username="named_actor") + self.named_event = self._event(actor=self.user) + self.programmatic_event = self._event(actor=None) + self.reconstructed_programmatic_event = self._event( + actor=None, + is_reconstructed=True, + source=AuditEvent.Source.COLDFRONT_HISTORY, + ) + + def test_named_actor_filter_returns_only_rows_with_actor(self): + queryset = self._filter("named") + + self.assertEqual(list(queryset), [self.named_event]) + + def test_programmatic_filter_returns_rows_without_actor(self): + queryset = self._filter("programmatic") + + self.assertEqual( + set(queryset), + {self.programmatic_event, self.reconstructed_programmatic_event}, + ) + + def test_programmatic_actor_display_is_unchanged(self): + self.assertEqual(self.programmatic_event.actor_display, "programmatic") + self.assertEqual(self.reconstructed_programmatic_event.actor_display, "programmatic") + + def _filter(self, value): + from .admin import ActorTypeFilter, AuditEventAdmin + + model_admin = AuditEventAdmin(AuditEvent, None) + filter_instance = ActorTypeFilter(None, {"actor_type": value}, AuditEvent, model_admin) + filter_instance.used_parameters = {"actor_type": value} + return filter_instance.queryset(None, AuditEvent.objects.order_by("pk")) + + def _event(self, **kwargs): + defaults = { + "action": AuditEvent.Action.ADMIN_CHANGE, + "target_type": "auth.user", + "target_id": "1", + "target_repr": "Example user", + "event_time": timezone.now(), + } + defaults.update(kwargs) + return AuditEvent.objects.create(**defaults) + + +class AuditEventAdminListDisplayTests(TestCase): + def test_transition_column_is_not_in_list_display(self): + from .admin import AuditEventAdmin + + self.assertIn("action_detail_display", AuditEventAdmin.list_display) + self.assertNotIn("transition_display", AuditEventAdmin.list_display) diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..fc160eb --- /dev/null +++ b/utils.py @@ -0,0 +1,196 @@ +from .middleware import get_current_actor, get_current_request +from .models import AuditEvent +from .resolvers import user_profile_label + + +ACTION_EVIDENCE_MAP = { + AuditEvent.Action.ADMIN_ADDITION: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.ADMIN_CHANGE: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.ADMIN_DELETION: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.USER_PI_UPGRADED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.PI_STATUS_REVOKED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.ALLOCATION_REQUESTED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.ALLOCATION_STATUS_CHANGED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.ALLOCATION_DISABLED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.RENEWAL_REQUESTED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.RENEWAL_APPROVED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.ALLOCATION_RENEWED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.PROJECT_USER_ADDED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.PROJECT_USER_REMOVED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.PROJECT_USER_ROLE_CHANGED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.PROJECT_CREATED: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.PROJECT_ARCHIVED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.PROJECT_DELETED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.PROJECT_STATUS_CHANGED: ( + AuditEvent.EvidenceCategory.AC_AU, + "AC,AU", + ), + AuditEvent.Action.PROJECT_PI_CHANGED: ( + AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, + "AC", + ), + AuditEvent.Action.PROJECT_REVIEW_FORCED: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.PROJECT_REVIEW_SUBMITTED: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.PROJECT_REVIEW_COMPLETED: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED: ( + AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, + "AU", + ), + AuditEvent.Action.RESOURCE_CREATED: ( + AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, + "CM", + ), + AuditEvent.Action.RESOURCE_CHANGED: ( + AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, + "CM", + ), + AuditEvent.Action.RESOURCE_DELETED: ( + AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, + "CM", + ), +} + + +def _get_request_ip(request): + forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") + if forwarded_for: + return forwarded_for.split(",")[0].strip() or None + return request.META.get("REMOTE_ADDR") or None + + +def _get_target_repr(target): + if target._meta.label_lower == "user.userprofile": + return user_profile_label(target)[:255] + try: + return str(target)[:255] + except Exception: + meta = target._meta + pk = getattr(target, "pk", "") + return f"{meta.app_label}.{meta.model_name} object ({pk})"[:255] + + +def log_event( + action, + target, + old_values=None, + new_values=None, + message="", + actor=None, + request=None, + evidence_category="", + control_family="", + source=AuditEvent.Source.RUNTIME, + source_id="", + is_reconstructed=False, + event_time=None, + target_repr="", +): + if target is None or isinstance(target, AuditEvent): + return None + + request = request if request is not None else get_current_request() + actor = actor if actor is not None else get_current_actor() + if actor is not None and getattr(actor, "is_authenticated", False) is False: + actor = None + + if not evidence_category or not control_family: + default_evidence_category, default_control_family = ACTION_EVIDENCE_MAP.get(action, ("", "")) + evidence_category = evidence_category or default_evidence_category + control_family = control_family or default_control_family + + meta = target._meta + request_path = "" + ip_address = None + if request is not None: + request_path = getattr(request, "path", "")[:255] + ip_address = _get_request_ip(request) + + values = { + "actor": actor, + "action": action, + "evidence_category": evidence_category, + "control_family": control_family, + "target_type": f"{meta.app_label}.{meta.model_name}", + "target_id": str(getattr(target, "pk", "") or "")[:128], + "target_repr": (target_repr or _get_target_repr(target))[:255], + "old_values": old_values or {}, + "new_values": new_values or {}, + "message": str(message or ""), + "source": source, + "source_id": str(source_id or "")[:128], + "is_reconstructed": is_reconstructed, + "request_path": request_path, + "ip_address": ip_address, + } + if event_time is not None: + values["event_time"] = event_time + + return AuditEvent.objects.create(**values)