cmmc_audit/signals.py
2026-05-27 15:48:32 -06:00

538 lines
20 KiB
Python

import datetime
from django.contrib.auth import get_user_model
from django.db.models.signals import post_delete, post_save, pre_save
from django.dispatch import receiver
from coldfront.core.allocation.models import Allocation, AllocationChangeRequest
from coldfront.core.allocation.signals import (
allocation_activate,
allocation_change_approved,
allocation_change_created,
allocation_disable,
allocation_new,
)
from coldfront.core.project.models import Project, ProjectReview, ProjectUser
from coldfront.core.project.signals import project_activate_user, project_archive, project_remove_user
from coldfront.core.resource.models import Resource
from coldfront.core.user.models import UserProfile
from .models import AuditEvent
from .resolvers import allocation_label, project_label, project_review_label, project_user_label, user_label
from .utils import log_event
from .project_review_audit import is_project_review_status_suppressed
User = get_user_model()
def _choice_name(choice):
return getattr(choice, "name", None)
def _date_value(value):
return value.isoformat() if value else None
def _user_label(user):
try:
return user_label(user)
except Exception:
return getattr(user, "username", str(user))
def _project_user_summary(project_user):
return project_user_label(project_user)
def _allocation_summary(allocation):
return allocation_label(allocation)
def _project_review_summary(project_review):
return project_review_label(project_review)
def _project_values(project):
return {
"title": project.title,
"status": _choice_name(project.status),
"pi_id": project.pi_id,
"project_code": project.project_code,
}
def _cache_old_instance(instance, attr_name):
if not instance.pk:
setattr(instance, attr_name, None)
return
try:
old_instance = instance.__class__.objects.get(pk=instance.pk)
except instance.__class__.DoesNotExist:
old_instance = None
setattr(instance, attr_name, old_instance)
@receiver(pre_save, sender=UserProfile, dispatch_uid="cmmc_audit_user_profile_pre_save")
def cache_user_profile(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=UserProfile, dispatch_uid="cmmc_audit_user_profile_post_save")
def audit_user_profile_pi_status(sender, instance, created, **kwargs):
if created:
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None or old_instance.is_pi == instance.is_pi:
return
if instance.is_pi:
action = AuditEvent.Action.USER_PI_UPGRADED
message = f"PI status changed for {_user_label(instance.user)}: upgraded to PI"
else:
action = AuditEvent.Action.PI_STATUS_REVOKED
message = f"PI status changed for {_user_label(instance.user)}: PI status revoked"
log_event(
action,
instance.user,
old_values={"is_pi": old_instance.is_pi},
new_values={"is_pi": instance.is_pi},
message=message,
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_user_label(instance.user),
)
@receiver(pre_save, sender=User, dispatch_uid="cmmc_audit_user_pre_save")
def cache_user(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=User, dispatch_uid="cmmc_audit_user_post_save")
def audit_user_admin_privileges(sender, instance, created, **kwargs):
if created:
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None:
return
old_values = {
"is_staff": old_instance.is_staff,
"is_superuser": old_instance.is_superuser,
}
new_values = {
"is_staff": instance.is_staff,
"is_superuser": instance.is_superuser,
}
if old_values == new_values:
return
log_event(
AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED,
instance,
old_values=old_values,
new_values=new_values,
message=(
f"Admin privileges changed for {instance.username}: "
f"is_staff {old_instance.is_staff} -> {instance.is_staff}; "
f"is_superuser {old_instance.is_superuser} -> {instance.is_superuser}"
),
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=instance.username,
)
@receiver(allocation_new, dispatch_uid="cmmc_audit_allocation_new")
def audit_allocation_requested(sender, allocation_pk, **kwargs):
allocation = Allocation.objects.select_related("project", "status").get(pk=allocation_pk)
log_event(
AuditEvent.Action.ALLOCATION_REQUESTED,
allocation,
new_values={
"status": _choice_name(allocation.status),
"start_date": _date_value(allocation.start_date),
"end_date": _date_value(allocation.end_date),
"project_id": allocation.project_id,
"resource_ids": list(allocation.resources.values_list("pk", flat=True)),
},
message=f"Allocation requested for {project_label(allocation.project)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation),
)
@receiver(allocation_activate, dispatch_uid="cmmc_audit_allocation_activate")
def audit_allocation_activated(sender, allocation_pk, **kwargs):
allocation = Allocation.objects.select_related("status").get(pk=allocation_pk)
log_event(
AuditEvent.Action.ALLOCATION_STATUS_CHANGED,
allocation,
new_values={"status": _choice_name(allocation.status)},
message=f"Allocation approved for {project_label(allocation.project)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation),
)
@receiver(allocation_disable, dispatch_uid="cmmc_audit_allocation_disable")
def audit_allocation_disabled(sender, allocation_pk, **kwargs):
allocation = Allocation.objects.select_related("status").get(pk=allocation_pk)
status_name = _choice_name(allocation.status)
# allocation_disable only carries allocation_pk. The sending view can know
# whether the user clicked deny or selected revoke, but that context is not
# included in the signal, so record the resulting status without guessing.
log_event(
AuditEvent.Action.ALLOCATION_DISABLED,
allocation,
new_values={"status": status_name},
message=f"Allocation disabled for {project_label(allocation.project)}; resulting status: {status_name}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation),
)
@receiver(allocation_change_created, dispatch_uid="cmmc_audit_allocation_change_created")
def audit_allocation_change_created(sender, allocation_pk, allocation_change_pk, **kwargs):
allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get(
pk=allocation_change_pk
)
action = AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED
allocation_text = _allocation_summary(allocation_change.allocation)
message = f"Allocation change requested for {allocation_text}"
if allocation_change.end_date_extension:
action = AuditEvent.Action.RENEWAL_REQUESTED
message = (
f"Renewal requested for {allocation_text}: "
f"{allocation_change.end_date_extension} days"
)
log_event(
action,
allocation_change.allocation,
new_values={
"allocation_change_request_id": allocation_change.pk,
"status": _choice_name(allocation_change.status),
"end_date_extension": allocation_change.end_date_extension,
},
message=message,
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation_change.allocation),
)
@receiver(allocation_change_approved, dispatch_uid="cmmc_audit_allocation_change_approved")
def audit_allocation_change_approved(sender, allocation_pk, allocation_change_pk, **kwargs):
allocation_change = AllocationChangeRequest.objects.select_related("allocation", "status").get(
pk=allocation_change_pk
)
if not allocation_change.end_date_extension:
return
allocation_text = _allocation_summary(allocation_change.allocation)
values = {
"allocation_change_request_id": allocation_change.pk,
"status": _choice_name(allocation_change.status),
"end_date_extension": allocation_change.end_date_extension,
"allocation_end_date": _date_value(allocation_change.allocation.end_date),
}
previous_end_date = None
if allocation_change.allocation.end_date and allocation_change.end_date_extension:
previous_end_date = allocation_change.allocation.end_date - datetime.timedelta(
days=allocation_change.end_date_extension
)
log_event(
AuditEvent.Action.RENEWAL_APPROVED,
allocation_change.allocation,
new_values=values,
message=(
f"Renewal approved for {allocation_text}, "
f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}"
),
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation_change.allocation),
)
log_event(
AuditEvent.Action.ALLOCATION_RENEWED,
allocation_change.allocation,
new_values=values,
message=(
f"{allocation_text} renewed, "
f"end date {_date_value(previous_end_date)} -> {_date_value(allocation_change.allocation.end_date)}"
),
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_allocation_summary(allocation_change.allocation),
)
@receiver(project_activate_user, dispatch_uid="cmmc_audit_project_activate_user")
def audit_project_user_added(sender, project_user_pk, **kwargs):
project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk)
# Project.add_user() uses update_or_create(), but project_activate_user only
# provides project_user_pk. Without created/old-state metadata, this signal
# can mean a new membership, reactivation, role update, or status update.
log_event(
AuditEvent.Action.PROJECT_USER_ADDED,
project_user,
new_values={
"project_id": project_user.project_id,
"user_id": project_user.user_id,
"role": _choice_name(project_user.role),
"status": _choice_name(project_user.status),
},
message=(
f"User {project_user.user.username} added/activated on "
f"{project_label(project_user.project)} as {project_user.role.name}"
),
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_project_user_summary(project_user),
)
@receiver(project_remove_user, dispatch_uid="cmmc_audit_project_remove_user")
def audit_project_user_removed(sender, project_user_pk, **kwargs):
project_user = ProjectUser.objects.select_related("project", "user", "role", "status").get(pk=project_user_pk)
log_event(
AuditEvent.Action.PROJECT_USER_REMOVED,
project_user,
old_values={
"project_id": project_user.project_id,
"user_id": project_user.user_id,
"role": _choice_name(project_user.role),
"status": _choice_name(project_user.status),
},
message=f"User {project_user.user.username} removed from {project_label(project_user.project)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_project_user_summary(project_user),
)
@receiver(pre_save, sender=Project, dispatch_uid="cmmc_audit_project_pre_save")
def cache_project(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=Project, dispatch_uid="cmmc_audit_project_post_save")
def audit_project_save(sender, instance, created, **kwargs):
if created:
log_event(
AuditEvent.Action.PROJECT_CREATED,
instance,
new_values=_project_values(instance),
message=f"Project created: {project_label(instance)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(instance),
)
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None:
return
old_status = _choice_name(old_instance.status)
new_status = _choice_name(instance.status)
if old_status != new_status:
action = (
AuditEvent.Action.PROJECT_ARCHIVED
if new_status == "Archived"
else AuditEvent.Action.PROJECT_STATUS_CHANGED
)
log_event(
action,
instance,
old_values={"status": old_status, "pi_id": old_instance.pi_id},
new_values={"status": new_status, "pi_id": instance.pi_id},
message=f"Project status changed for {project_label(instance)}: {old_status} -> {new_status}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(instance),
)
if old_instance.pi_id != instance.pi_id:
log_event(
AuditEvent.Action.PROJECT_PI_CHANGED,
instance,
old_values={"pi_id": old_instance.pi_id},
new_values={"pi_id": instance.pi_id},
message=f"Project PI changed for {project_label(instance)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(instance),
)
if created:
return
if old_instance.force_review or not instance.force_review:
return
log_event(
AuditEvent.Action.PROJECT_REVIEW_FORCED,
instance,
old_values={
"force_review": old_instance.force_review,
"requires_review": old_instance.requires_review,
},
new_values={
"force_review": instance.force_review,
"requires_review": instance.requires_review,
},
message=f"Project review forced for {project_label(instance)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(instance),
)
@receiver(project_archive, dispatch_uid="cmmc_audit_project_archive")
def audit_project_archived_signal(sender, project_obj, **kwargs):
if AuditEvent.objects.filter(
action=AuditEvent.Action.PROJECT_ARCHIVED,
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_type="project.project",
target_id=str(project_obj.pk),
).exists():
return
log_event(
AuditEvent.Action.PROJECT_ARCHIVED,
project_obj,
new_values=_project_values(project_obj),
message=f"Project archived: {project_label(project_obj)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(project_obj),
)
@receiver(post_delete, sender=Project, dispatch_uid="cmmc_audit_project_post_delete")
def audit_project_delete(sender, instance, **kwargs):
log_event(
AuditEvent.Action.PROJECT_DELETED,
instance,
old_values=_project_values(instance),
message=f"Project deleted: {project_label(instance)}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=project_label(instance),
)
@receiver(pre_save, sender=ProjectReview, dispatch_uid="cmmc_audit_project_review_pre_save")
def cache_project_review(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=ProjectReview, dispatch_uid="cmmc_audit_project_review_post_save")
def audit_project_review_status_changed(sender, instance, created, **kwargs):
if created or is_project_review_status_suppressed(instance.pk):
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None:
return
old_status = _choice_name(old_instance.status)
new_status = _choice_name(instance.status)
if old_status == new_status:
return
log_event(
AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED,
instance,
old_values={"status": old_status, "project_id": instance.project_id},
new_values={"status": new_status, "project_id": instance.project_id},
message=f"Project review status changed for {project_label(instance.project)}: {old_status} -> {new_status}",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_project_review_summary(instance),
)
@receiver(pre_save, sender=ProjectUser, dispatch_uid="cmmc_audit_project_user_role_pre_save")
def cache_project_user(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=ProjectUser, dispatch_uid="cmmc_audit_project_user_role_post_save")
def audit_project_user_role_change(sender, instance, created, **kwargs):
if created:
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None:
return
old_role = _choice_name(old_instance.role)
new_role = _choice_name(instance.role)
if old_role == new_role:
return
log_event(
AuditEvent.Action.PROJECT_USER_ROLE_CHANGED,
instance,
old_values={"role": old_role},
new_values={"role": new_role},
message=(
f"Project user role changed for {instance.user.username} on "
f"{project_label(instance.project)}: {old_role} -> {new_role}"
),
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=_project_user_summary(instance),
)
@receiver(pre_save, sender=Resource, dispatch_uid="cmmc_audit_resource_pre_save")
def cache_resource(sender, instance, **kwargs):
_cache_old_instance(instance, "_cmmc_audit_old")
@receiver(post_save, sender=Resource, dispatch_uid="cmmc_audit_resource_post_save")
def audit_resource_save(sender, instance, created, **kwargs):
if created:
log_event(
AuditEvent.Action.RESOURCE_CREATED,
instance,
new_values={"name": instance.name, "resource_type_id": instance.resource_type_id},
message=f"Resource {instance.name} created",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=instance.name,
)
return
old_instance = getattr(instance, "_cmmc_audit_old", None)
if old_instance is None:
return
tracked_fields = ("name", "description", "is_available", "is_public", "is_allocatable", "requires_payment")
old_values = {field: getattr(old_instance, field) for field in tracked_fields}
new_values = {field: getattr(instance, field) for field in tracked_fields}
old_values["resource_type_id"] = old_instance.resource_type_id
new_values["resource_type_id"] = instance.resource_type_id
old_values["parent_resource_id"] = old_instance.parent_resource_id
new_values["parent_resource_id"] = instance.parent_resource_id
changed_old = {key: value for key, value in old_values.items() if value != new_values[key]}
changed_new = {key: value for key, value in new_values.items() if old_values[key] != value}
if not changed_old:
return
log_event(
AuditEvent.Action.RESOURCE_CHANGED,
instance,
old_values=changed_old,
new_values=changed_new,
message=f"Resource {instance.name} changed",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=instance.name,
)
@receiver(post_delete, sender=Resource, dispatch_uid="cmmc_audit_resource_post_delete")
def audit_resource_delete(sender, instance, **kwargs):
log_event(
AuditEvent.Action.RESOURCE_DELETED,
instance,
old_values={"name": instance.name, "resource_type_id": instance.resource_type_id},
message=f"Resource {instance.name} deleted",
source=AuditEvent.Source.COLDFRONT_SIGNAL,
target_repr=instance.name,
)