import datetime from django.contrib.auth import get_user_model from django.db.models.signals import post_delete, post_save, pre_save from django.dispatch import receiver from coldfront.core.allocation.models import Allocation, AllocationChangeRequest from coldfront.core.allocation.signals import ( allocation_activate, allocation_change_approved, allocation_change_created, allocation_disable, allocation_new, ) from coldfront.core.project.models import Project, ProjectReview, ProjectUser from coldfront.core.project.signals import project_activate_user, project_archive, project_remove_user from coldfront.core.resource.models import Resource from coldfront.core.user.models import UserProfile from .models import AuditEvent from .resolvers import allocation_label, project_label, project_review_label, project_user_label, user_label from .utils import log_event from .project_review_audit import is_project_review_status_suppressed User = get_user_model() def _choice_name(choice): return getattr(choice, "name", None) def _date_value(value): return value.isoformat() if value else None def _user_label(user): try: return user_label(user) except Exception: return getattr(user, "username", str(user)) def _project_user_summary(project_user): return project_user_label(project_user) def _allocation_summary(allocation): return allocation_label(allocation) def _project_review_summary(project_review): return project_review_label(project_review) def _project_values(project): return { "title": project.title, "status": _choice_name(project.status), "pi_id": project.pi_id, "project_code": project.project_code, } def _cache_old_instance(instance, attr_name): if not instance.pk: setattr(instance, attr_name, None) return try: old_instance = instance.__class__.objects.get(pk=instance.pk) except instance.__class__.DoesNotExist: old_instance = None setattr(instance, attr_name, old_instance) @receiver(pre_save, sender=UserProfile, dispatch_uid="cmmc_audit_user_profile_pre_save") def cache_user_profile(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=UserProfile, dispatch_uid="cmmc_audit_user_profile_post_save") def audit_user_profile_pi_status(sender, instance, created, **kwargs): if created: return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None or old_instance.is_pi == instance.is_pi: return if instance.is_pi: action = AuditEvent.Action.USER_PI_UPGRADED message = f"PI status changed for {_user_label(instance.user)}: upgraded to PI" else: action = AuditEvent.Action.PI_STATUS_REVOKED message = f"PI status changed for {_user_label(instance.user)}: PI status revoked" log_event( action, instance.user, old_values={"is_pi": old_instance.is_pi}, new_values={"is_pi": instance.is_pi}, message=message, source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_user_label(instance.user), ) @receiver(pre_save, sender=User, dispatch_uid="cmmc_audit_user_pre_save") def cache_user(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=User, dispatch_uid="cmmc_audit_user_post_save") def audit_user_admin_privileges(sender, instance, created, **kwargs): if created: return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None: return old_values = { "is_staff": old_instance.is_staff, "is_superuser": old_instance.is_superuser, } new_values = { "is_staff": instance.is_staff, "is_superuser": instance.is_superuser, } if old_values == new_values: return log_event( AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED, instance, old_values=old_values, new_values=new_values, message=( f"Admin privileges changed for {instance.username}: " f"is_staff {old_instance.is_staff} -> {instance.is_staff}; " f"is_superuser {old_instance.is_superuser} -> {instance.is_superuser}" ), source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=instance.username, ) @receiver(allocation_new, dispatch_uid="cmmc_audit_allocation_new") def audit_allocation_requested(sender, allocation_pk, **kwargs): allocation = Allocation.objects.select_related("project", "status").get(pk=allocation_pk) log_event( AuditEvent.Action.ALLOCATION_REQUESTED, allocation, new_values={ "status": _choice_name(allocation.status), "start_date": _date_value(allocation.start_date), "end_date": _date_value(allocation.end_date), "project_id": allocation.project_id, "resource_ids": list(allocation.resources.values_list("pk", flat=True)), }, message=f"Allocation requested for {project_label(allocation.project)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation), ) @receiver(allocation_activate, dispatch_uid="cmmc_audit_allocation_activate") def audit_allocation_activated(sender, allocation_pk, **kwargs): allocation = Allocation.objects.select_related("status").get(pk=allocation_pk) log_event( AuditEvent.Action.ALLOCATION_STATUS_CHANGED, allocation, new_values={"status": _choice_name(allocation.status)}, message=f"Allocation approved for {project_label(allocation.project)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation), ) @receiver(allocation_disable, dispatch_uid="cmmc_audit_allocation_disable") def audit_allocation_disabled(sender, allocation_pk, **kwargs): allocation = Allocation.objects.select_related("status").get(pk=allocation_pk) status_name = _choice_name(allocation.status) # allocation_disable only carries allocation_pk. The sending view can know # whether the user clicked deny or selected revoke, but that context is not # included in the signal, so record the resulting status without guessing. log_event( AuditEvent.Action.ALLOCATION_DISABLED, allocation, new_values={"status": status_name}, message=f"Allocation disabled for {project_label(allocation.project)}; resulting status: {status_name}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation), ) @receiver(allocation_change_created, dispatch_uid="cmmc_audit_allocation_change_created") def audit_allocation_change_created(sender, allocation_pk, allocation_change_pk, **kwargs): allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get( pk=allocation_change_pk ) action = AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED allocation_text = _allocation_summary(allocation_change.allocation) message = f"Allocation change requested for {allocation_text}" if allocation_change.end_date_extension: action = AuditEvent.Action.RENEWAL_REQUESTED message = ( f"Renewal requested for {allocation_text}: " f"{allocation_change.end_date_extension} days" ) log_event( action, allocation_change.allocation, new_values={ "allocation_change_request_id": allocation_change.pk, "status": _choice_name(allocation_change.status), "end_date_extension": allocation_change.end_date_extension, }, message=message, source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation_change.allocation), ) @receiver(allocation_change_approved, dispatch_uid="cmmc_audit_allocation_change_approved") def audit_allocation_change_approved(sender, allocation_pk, allocation_change_pk, **kwargs): allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get( pk=allocation_change_pk ) if not allocation_change.end_date_extension: return allocation_text = _allocation_summary(allocation_change.allocation) values = { "allocation_change_request_id": allocation_change.pk, "status": _choice_name(allocation_change.status), "end_date_extension": allocation_change.end_date_extension, "allocation_end_date": _date_value(allocation_change.allocation.end_date), } previous_end_date = None if allocation_change.allocation.end_date and allocation_change.end_date_extension: previous_end_date = allocation_change.allocation.end_date - datetime.timedelta( days=allocation_change.end_date_extension ) log_event( AuditEvent.Action.RENEWAL_APPROVED, allocation_change.allocation, new_values=values, message=( f"Renewal approved for {allocation_text}, " f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}" ), source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation_change.allocation), ) log_event( AuditEvent.Action.ALLOCATION_RENEWED, allocation_change.allocation, new_values=values, message=( f"{allocation_text} renewed, " f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}" ), source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_allocation_summary(allocation_change.allocation), ) @receiver(project_activate_user, dispatch_uid="cmmc_audit_project_activate_user") def audit_project_user_added(sender, project_user_pk, **kwargs): project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk) # Project.add_user() uses update_or_create(), but project_activate_user only # provides project_user_pk. Without created/old-state metadata, this signal # can mean a new membership, reactivation, role update, or status update. log_event( AuditEvent.Action.PROJECT_USER_ADDED, project_user, new_values={ "project_id": project_user.project_id, "user_id": project_user.user_id, "role": _choice_name(project_user.role), "status": _choice_name(project_user.status), }, message=( f"User {project_user.user.username} added/activated on " f"{project_label(project_user.project)} as {project_user.role.name}" ), source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_project_user_summary(project_user), ) @receiver(project_remove_user, dispatch_uid="cmmc_audit_project_remove_user") def audit_project_user_removed(sender, project_user_pk, **kwargs): project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk) log_event( AuditEvent.Action.PROJECT_USER_REMOVED, project_user, old_values={ "project_id": project_user.project_id, "user_id": project_user.user_id, "role": _choice_name(project_user.role), "status": _choice_name(project_user.status), }, message=f"User {project_user.user.username} removed from {project_label(project_user.project)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_project_user_summary(project_user), ) @receiver(pre_save, sender=Project, dispatch_uid="cmmc_audit_project_pre_save") def cache_project(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=Project, dispatch_uid="cmmc_audit_project_post_save") def audit_project_save(sender, instance, created, **kwargs): if created: log_event( AuditEvent.Action.PROJECT_CREATED, instance, new_values=_project_values(instance), message=f"Project created: {project_label(instance)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(instance), ) return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None: return old_status = _choice_name(old_instance.status) new_status = _choice_name(instance.status) if old_status != new_status: action = ( AuditEvent.Action.PROJECT_ARCHIVED if new_status == "Archived" else AuditEvent.Action.PROJECT_STATUS_CHANGED ) log_event( action, instance, old_values={"status": old_status, "pi_id": old_instance.pi_id}, new_values={"status": new_status, "pi_id": instance.pi_id}, message=f"Project status changed for {project_label(instance)}: {old_status} -> {new_status}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(instance), ) if old_instance.pi_id != instance.pi_id: log_event( AuditEvent.Action.PROJECT_PI_CHANGED, instance, old_values={"pi_id": old_instance.pi_id}, new_values={"pi_id": instance.pi_id}, message=f"Project PI changed for {project_label(instance)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(instance), ) if created: return if old_instance.force_review or not instance.force_review: return log_event( AuditEvent.Action.PROJECT_REVIEW_FORCED, instance, old_values={ "force_review": old_instance.force_review, "requires_review": old_instance.requires_review, }, new_values={ "force_review": instance.force_review, "requires_review": instance.requires_review, }, message=f"Project review forced for {project_label(instance)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(instance), ) @receiver(project_archive, dispatch_uid="cmmc_audit_project_archive") def audit_project_archived_signal(sender, project_obj, **kwargs): if AuditEvent.objects.filter( action=AuditEvent.Action.PROJECT_ARCHIVED, source=AuditEvent.Source.COLDFRONT_SIGNAL, target_type="project.project", target_id=str(project_obj.pk), ).exists(): return log_event( AuditEvent.Action.PROJECT_ARCHIVED, project_obj, new_values=_project_values(project_obj), message=f"Project archived: {project_label(project_obj)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(project_obj), ) @receiver(post_delete, sender=Project, dispatch_uid="cmmc_audit_project_post_delete") def audit_project_delete(sender, instance, **kwargs): log_event( AuditEvent.Action.PROJECT_DELETED, instance, old_values=_project_values(instance), message=f"Project deleted: {project_label(instance)}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=project_label(instance), ) @receiver(pre_save, sender=ProjectReview, dispatch_uid="cmmc_audit_project_review_pre_save") def cache_project_review(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=ProjectReview, dispatch_uid="cmmc_audit_project_review_post_save") def audit_project_review_status_changed(sender, instance, created, **kwargs): if created or is_project_review_status_suppressed(instance.pk): return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None: return old_status = _choice_name(old_instance.status) new_status = _choice_name(instance.status) if old_status == new_status: return log_event( AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED, instance, old_values={"status": old_status, "project_id": instance.project_id}, new_values={"status": new_status, "project_id": instance.project_id}, message=f"Project review status changed for {project_label(instance.project)}: {old_status} -> {new_status}", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_project_review_summary(instance), ) @receiver(pre_save, sender=ProjectUser, dispatch_uid="cmmc_audit_project_user_role_pre_save") def cache_project_user(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=ProjectUser, dispatch_uid="cmmc_audit_project_user_role_post_save") def audit_project_user_role_change(sender, instance, created, **kwargs): if created: return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None: return old_role = _choice_name(old_instance.role) new_role = _choice_name(instance.role) if old_role == new_role: return log_event( AuditEvent.Action.PROJECT_USER_ROLE_CHANGED, instance, old_values={"role": old_role}, new_values={"role": new_role}, message=( f"Project user role changed for {instance.user.username} on " f"{project_label(instance.project)}: {old_role} -> {new_role}" ), source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=_project_user_summary(instance), ) @receiver(pre_save, sender=Resource, dispatch_uid="cmmc_audit_resource_pre_save") def cache_resource(sender, instance, **kwargs): _cache_old_instance(instance, "_cmmc_audit_old") @receiver(post_save, sender=Resource, dispatch_uid="cmmc_audit_resource_post_save") def audit_resource_save(sender, instance, created, **kwargs): if created: log_event( AuditEvent.Action.RESOURCE_CREATED, instance, new_values={"name": instance.name, "resource_type_id": instance.resource_type_id}, message=f"Resource {instance.name} created", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=instance.name, ) return old_instance = getattr(instance, "_cmmc_audit_old", None) if old_instance is None: return tracked_fields = ("name", "description", "is_available", "is_public", "is_allocatable", "requires_payment") old_values = {field: getattr(old_instance, field) for field in tracked_fields} new_values = {field: getattr(instance, field) for field in tracked_fields} old_values["resource_type_id"] = old_instance.resource_type_id new_values["resource_type_id"] = instance.resource_type_id old_values["parent_resource_id"] = old_instance.parent_resource_id new_values["parent_resource_id"] = instance.parent_resource_id changed_old = {key: value for key, value in old_values.items() if value != new_values[key]} changed_new = {key: value for key, value in new_values.items() if old_values[key] != value} if not changed_old: return log_event( AuditEvent.Action.RESOURCE_CHANGED, instance, old_values=changed_old, new_values=changed_new, message=f"Resource {instance.name} changed", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=instance.name, ) @receiver(post_delete, sender=Resource, dispatch_uid="cmmc_audit_resource_post_delete") def audit_resource_delete(sender, instance, **kwargs): log_event( AuditEvent.Action.RESOURCE_DELETED, instance, old_values={"name": instance.name, "resource_type_id": instance.resource_type_id}, message=f"Resource {instance.name} deleted", source=AuditEvent.Source.COLDFRONT_SIGNAL, target_repr=instance.name, )