import json from collections import Counter from django.apps import apps from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry from django.utils import timezone from .models import AuditEvent from .resolvers import ( allocation_label, allocation_label_from_parts, project_label, project_label_from_id, project_user_label, user_label, user_profile_label_from_id, ) from .utils import ACTION_EVIDENCE_MAP RUN_NAME = "initial_historical_backfill" ADMIN_SOURCE = AuditEvent.Source.DJANGO_ADMIN_LOG HISTORY_SOURCE = AuditEvent.Source.COLDFRONT_HISTORY class BackfillReport: def __init__(self): self.examined = Counter() self.created = Counter() self.would_create = Counter() self.duplicates = 0 self.ambiguous = 0 self.by_action = Counter() self.by_evidence = Counter() def record_examined(self, source): self.examined[source] += 1 def record_duplicate(self): self.duplicates += 1 def record_ambiguous(self): self.ambiguous += 1 def record_event(self, action, evidence_category, dry_run): if dry_run: self.would_create[action] += 1 else: self.created[action] += 1 self.by_action[action] += 1 self.by_evidence[evidence_category or "unclassified"] += 1 @property def created_events(self): return sum(self.created.values()) @property def would_create_events(self): return sum(self.would_create.values()) def _json_message(change_message): if not change_message: return "" if isinstance(change_message, str): return change_message return json.dumps(change_message, default=str) def _changed_fields(change_message): if not change_message: return [] try: entries = json.loads(change_message) if isinstance(change_message, str) else change_message except (TypeError, ValueError): return [] fields = [] for entry in entries: changed = entry.get("changed", {}) if isinstance(entry, dict) else {} fields.extend(changed.get("fields", [])) return fields def _has_changed_field(change_message, field_name): normalized = {field.lower().replace("_", " ") for field in _changed_fields(change_message)} return field_name.lower().replace("_", " ") in normalized def _target_from_logentry(log_entry): if not log_entry.content_type_id: return "", "", log_entry.object_id or "", log_entry.object_repr app_label = log_entry.content_type.app_label model_name = log_entry.content_type.model return app_label, model_name, log_entry.object_id or "", log_entry.object_repr def _defaults_for_action(action): evidence_category, control_family = ACTION_EVIDENCE_MAP.get(action, ("", "")) return evidence_category, control_family def _create_event( *, action, source, source_id, event_time, actor, target_type, target_id, target_repr, message, old_values=None, new_values=None, dry_run, report, ): if AuditEvent.objects.filter(source=source, source_id=str(source_id), action=action).exists(): report.record_duplicate() return None evidence_category, control_family = _defaults_for_action(action) report.record_event(action, evidence_category, dry_run) if dry_run: return None return AuditEvent.objects.create( actor=actor, action=action, evidence_category=evidence_category, control_family=control_family, target_type=target_type, target_id=str(target_id or "")[:128], target_repr=str(target_repr or "")[:255], old_values=old_values or {}, new_values=new_values or {}, message=message, source=source, source_id=str(source_id)[:128], is_reconstructed=True, event_time=event_time, ) def _admin_action(log_entry): if log_entry.action_flag == ADDITION: return AuditEvent.Action.ADMIN_ADDITION if log_entry.action_flag == CHANGE: return AuditEvent.Action.ADMIN_CHANGE if log_entry.action_flag == DELETION: return AuditEvent.Action.ADMIN_DELETION return None def _admin_message(log_entry, action, object_repr=None): change_message = _json_message(log_entry.change_message) target_repr = object_repr or log_entry.object_repr if action == AuditEvent.Action.ADMIN_ADDITION: verb = "added" elif action == AuditEvent.Action.ADMIN_DELETION: verb = "deleted" else: verb = "changed" suffix = f" Change message: {change_message}" if change_message else "" return f"Reconstructed from Django admin log: admin {verb} {target_repr}.{suffix}" def _admin_target_repr(log_entry, target_type, object_id, object_repr): if target_type == "user.userprofile": return user_profile_label_from_id(object_id) if target_type == "project.project" and log_entry.action_flag == DELETION: return f"Deleted project: {object_repr} (id {object_id})" return object_repr def backfill_admin_log(*, dry_run, report): for log_entry in LogEntry.objects.select_related("content_type", "user").order_by("pk"): report.record_examined("django_admin_log") action = _admin_action(log_entry) if action is None: report.record_ambiguous() continue app_label, model_name, object_id, object_repr = _target_from_logentry(log_entry) target_type = f"{app_label}.{model_name}" if app_label and model_name else "" target_repr = _admin_target_repr(log_entry, target_type, object_id, object_repr) change_message = _json_message(log_entry.change_message) _create_event( action=action, source=ADMIN_SOURCE, source_id=log_entry.pk, event_time=log_entry.action_time, actor=log_entry.user, target_type=target_type, target_id=object_id, target_repr=target_repr, message=_admin_message(log_entry, action, target_repr), new_values={"change_message": change_message, "content_type": target_type}, dry_run=dry_run, report=report, ) for semantic in _semantic_events_from_logentry(log_entry, target_type, object_id, target_repr, change_message): _create_event( source=ADMIN_SOURCE, source_id=log_entry.pk, event_time=log_entry.action_time, actor=log_entry.user, dry_run=dry_run, report=report, **semantic, ) def _semantic_events_from_logentry(log_entry, target_type, object_id, object_repr, change_message): events = [] prefix = "Reconstructed from Django admin log:" fields = ", ".join(_changed_fields(change_message)) or "unknown fields" if target_type == "user.userprofile" and _has_changed_field(change_message, "Is pi"): events.append( { "action": AuditEvent.Action.USER_PI_UPGRADED, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} PI status changed for {object_repr}; direction unknown", "new_values": {"change_message": change_message, "direction": "unknown"}, } ) elif target_type == "project.projectuser" and _has_changed_field(change_message, "Role"): events.append( { "action": AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} project user role changed for {object_repr}; exact old/new roles unknown.", "new_values": {"change_message": change_message, "fields": fields}, } ) elif target_type == "resource.resource": resource_action = { ADDITION: AuditEvent.Action.RESOURCE_CREATED, CHANGE: AuditEvent.Action.RESOURCE_CHANGED, DELETION: AuditEvent.Action.RESOURCE_DELETED, }.get(log_entry.action_flag) if resource_action: events.append( { "action": resource_action, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} resource event for {object_repr}.", "new_values": {"change_message": change_message, "fields": fields}, } ) elif target_type == "project.project": project_action = { ADDITION: AuditEvent.Action.PROJECT_CREATED, DELETION: AuditEvent.Action.PROJECT_DELETED, }.get(log_entry.action_flag) if project_action: events.append( { "action": project_action, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} project event for {object_repr}.", "new_values": {"change_message": change_message, "fields": fields}, } ) elif log_entry.action_flag == CHANGE and _has_changed_field(change_message, "Pi"): events.append( { "action": AuditEvent.Action.PROJECT_PI_CHANGED, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} project PI changed for {object_repr}; exact old/new PI unknown.", "new_values": {"change_message": change_message, "fields": fields}, } ) elif log_entry.action_flag == CHANGE and _has_changed_field(change_message, "Status"): events.append( { "action": AuditEvent.Action.PROJECT_STATUS_CHANGED, "target_type": target_type, "target_id": object_id, "target_repr": object_repr, "message": f"{prefix} project status changed for {object_repr}; exact old/new status unknown.", "new_values": {"change_message": change_message, "fields": fields}, } ) return events def _model_or_none(label): try: return apps.get_model(label) except LookupError: return None def _choice_name(model_label, pk): if pk in (None, ""): return None model = _model_or_none(model_label) if model is None: return str(pk) try: return model.objects.get(pk=pk).name except Exception: return str(pk) def _history_source_id(model_label, history): return f"{model_label}:{history.history_id}" def _history_actor(history): return getattr(history, "history_user", None) def _resource_repr(history): if getattr(history, "name", ""): return f"Resource: {history.name}" return f"Resource id {history.id}" def _project_reference(project_id): if project_id in (None, ""): return "Project unknown" Project = _model_or_none("project.Project") if Project is None: return f"Project id {project_id} (model unavailable)" return project_label_from_id(project_id) def _project_history_repr(history, *, deleted=False): title = getattr(history, "title", "") or f"Project id {history.id}" if deleted: return f"Deleted project: {title} (id {history.id})" current_label = project_label_from_id(history.id) if "(not found)" not in current_label: return current_label return f"Project: {title} (id {history.id})" def _user_reference(user_id): if user_id in (None, ""): return "User unknown" User = _model_or_none("auth.User") if User is None: return f"User id {user_id} (model unavailable)" try: return f"{user_label(User.objects.get(pk=user_id))} (id {user_id})" except Exception: return f"User id {user_id} (not found)" def _allocation_repr(history): Allocation = _model_or_none("allocation.Allocation") if Allocation is not None: try: return allocation_label(Allocation.objects.get(pk=history.id)) except Exception: pass resource_names = getattr(history, "resource_names", []) return allocation_label_from_parts(resource_names, _project_reference(getattr(history, "project_id", None))) def _project_user_repr(history): ProjectUser = _model_or_none("project.ProjectUser") if ProjectUser is not None: try: return project_user_label(ProjectUser.objects.get(pk=history.id)) except Exception: pass return ( f"Project user {history.id}: " f"{_user_reference(getattr(history, 'user_id', None))} on " f"{_project_reference(getattr(history, 'project_id', None))}" ) def _iter_adjacent(history_model): rows = list(history_model.objects.order_by("id", "history_date", "history_id")) previous_by_id = {} for row in rows: previous = previous_by_id.get(row.id) yield previous, row previous_by_id[row.id] = row def backfill_history(*, dry_run, report): _backfill_project_history(dry_run=dry_run, report=report) _backfill_resource_history(dry_run=dry_run, report=report) _backfill_project_user_history(dry_run=dry_run, report=report) _backfill_allocation_history(dry_run=dry_run, report=report) def _backfill_project_history(*, dry_run, report): model_label = "project.HistoricalProject" model = _model_or_none(model_label) if model is None: return for previous, history in _iter_adjacent(model): report.record_examined(model_label) if history.history_type == "+": _create_event( action=AuditEvent.Action.PROJECT_CREATED, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="project.project", target_id=history.id, target_repr=_project_history_repr(history), message=f"Reconstructed from ColdFront history: project created: {_project_history_repr(history)}.", new_values=_project_history_values(history), dry_run=dry_run, report=report, ) continue if history.history_type == "-": _create_event( action=AuditEvent.Action.PROJECT_DELETED, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="project.project", target_id=history.id, target_repr=_project_history_repr(history, deleted=True), message=f"Reconstructed from ColdFront history: project deleted: {_project_history_repr(history, deleted=True)}.", old_values=_project_history_values(history), dry_run=dry_run, report=report, ) continue if history.history_type != "~" or previous is None: report.record_ambiguous() continue event_created = False if previous.status_id != history.status_id: old_status = _choice_name("project.ProjectStatusChoice", previous.status_id) new_status = _choice_name("project.ProjectStatusChoice", history.status_id) action = ( AuditEvent.Action.PROJECT_ARCHIVED if new_status == "Archived" else AuditEvent.Action.PROJECT_STATUS_CHANGED ) _create_event( action=action, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="project.project", target_id=history.id, target_repr=_project_history_repr(history), message=f"Reconstructed from ColdFront history: project status changed from {old_status} to {new_status}.", old_values={"status": old_status, "pi_id": previous.pi_id}, new_values={"status": new_status, "pi_id": history.pi_id}, dry_run=dry_run, report=report, ) event_created = True if previous.pi_id != history.pi_id: _create_event( action=AuditEvent.Action.PROJECT_PI_CHANGED, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="project.project", target_id=history.id, target_repr=_project_history_repr(history), message=( "Reconstructed from ColdFront history: project PI changed " f"from {_user_reference(previous.pi_id)} to {_user_reference(history.pi_id)}." ), old_values={"pi_id": previous.pi_id}, new_values={"pi_id": history.pi_id}, dry_run=dry_run, report=report, ) event_created = True if not event_created: report.record_ambiguous() def _backfill_resource_history(*, dry_run, report): model_label = "resource.HistoricalResource" model = _model_or_none(model_label) if model is None: return tracked = ("name", "description", "is_available", "is_public", "is_allocatable", "requires_payment") for previous, history in _iter_adjacent(model): report.record_examined(model_label) if history.history_type == "+": action = AuditEvent.Action.RESOURCE_CREATED elif history.history_type == "-": action = AuditEvent.Action.RESOURCE_DELETED elif history.history_type == "~" and previous is not None and any( getattr(previous, field) != getattr(history, field) for field in tracked ): action = AuditEvent.Action.RESOURCE_CHANGED else: report.record_ambiguous() continue _create_event( action=action, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="resource.resource", target_id=history.id, target_repr=_resource_repr(history), message=f"Reconstructed from ColdFront history: resource {_resource_repr(history)} {action.label.lower()}.", old_values=_history_values(previous, tracked) if previous else {}, new_values=_history_values(history, tracked) if history.history_type != "-" else {}, dry_run=dry_run, report=report, ) def _backfill_project_user_history(*, dry_run, report): model_label = "project.HistoricalProjectUser" model = _model_or_none(model_label) if model is None: return for previous, history in _iter_adjacent(model): report.record_examined(model_label) if history.history_type != "~" or previous is None or previous.role_id == history.role_id: report.record_ambiguous() continue old_role = _choice_name("project.ProjectUserRoleChoice", previous.role_id) new_role = _choice_name("project.ProjectUserRoleChoice", history.role_id) _create_event( action=AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="project.projectuser", target_id=history.id, target_repr=_project_user_repr(history), message=f"Reconstructed from ColdFront history: project user role changed from {old_role} to {new_role}.", old_values={"role": old_role, "project_id": previous.project_id, "user_id": previous.user_id}, new_values={"role": new_role, "project_id": history.project_id, "user_id": history.user_id}, dry_run=dry_run, report=report, ) def _backfill_allocation_history(*, dry_run, report): model_label = "allocation.HistoricalAllocation" model = _model_or_none(model_label) if model is None: return for previous, history in _iter_adjacent(model): report.record_examined(model_label) if history.history_type != "~" or previous is None: report.record_ambiguous() continue if previous.status_id != history.status_id: new_status = _choice_name("allocation.AllocationStatusChoice", history.status_id) old_status = _choice_name("allocation.AllocationStatusChoice", previous.status_id) action = AuditEvent.Action.ALLOCATION_DISABLED if new_status in {"Denied", "Revoked", "Disabled"} else AuditEvent.Action.ALLOCATION_STATUS_CHANGED _create_event( action=action, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="allocation.allocation", target_id=history.id, target_repr=_allocation_repr(history), message=f"Reconstructed from ColdFront history: allocation status changed from {old_status} to {new_status}.", old_values={"status": old_status, "project_id": previous.project_id}, new_values={"status": new_status, "project_id": history.project_id}, dry_run=dry_run, report=report, ) if previous.end_date and history.end_date and history.end_date > previous.end_date: _create_event( action=AuditEvent.Action.ALLOCATION_RENEWED, source=HISTORY_SOURCE, source_id=_history_source_id(model_label, history), event_time=history.history_date, actor=_history_actor(history), target_type="allocation.allocation", target_id=history.id, target_repr=_allocation_repr(history), message=f"Reconstructed from ColdFront history: allocation end date increased from {previous.end_date} to {history.end_date}.", old_values={"end_date": previous.end_date.isoformat(), "project_id": previous.project_id}, new_values={"end_date": history.end_date.isoformat(), "project_id": history.project_id}, dry_run=dry_run, report=report, ) def _history_values(history, fields): if history is None: return {} return {field: getattr(history, field) for field in fields} def _project_history_values(history): return { "title": history.title, "status": _choice_name("project.ProjectStatusChoice", history.status_id), "status_id": history.status_id, "pi_id": history.pi_id, "project_code": history.project_code, } def run_backfill(*, dry_run): report = BackfillReport() backfill_admin_log(dry_run=dry_run, report=report) backfill_history(dry_run=dry_run, report=report) return report