cmmc_audit/tests.py
2026-05-27 14:45:29 -06:00

574 lines
23 KiB
Python

from io import StringIO
from types import SimpleNamespace
from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry
from django.contrib.auth import get_user_model
from django.contrib.contenttypes.models import ContentType
from django.core.management import call_command
from django.test import TestCase
from django.utils.html import strip_tags
from django.utils import timezone
from coldfront.core.allocation.models import Allocation, AllocationStatusChoice
from coldfront.core.field_of_science.models import FieldOfScience
from coldfront.core.project.models import Project, ProjectStatusChoice
from coldfront.core.resource.models import Resource, ResourceType
from coldfront.core.user.models import UserProfile
from .backfill import RUN_NAME, _allocation_repr, _project_user_repr
from .models import AuditEvent, BackfillRun
from .resolvers import allocation_label
class BackfillCmmcAuditCommandTests(TestCase):
def setUp(self):
self.user = get_user_model().objects.create_user(
username="audit_backfill_admin",
email="audit_backfill_admin@example.test",
password="unused",
is_staff=True,
)
self.content_type = ContentType.objects.get_for_model(get_user_model())
def _log_entry(self, action_flag, object_id, object_repr, change_message="", content_type=None):
return LogEntry.objects.create(
user=self.user,
content_type=content_type or self.content_type,
object_id=str(object_id),
object_repr=object_repr,
action_flag=action_flag,
change_message=change_message,
)
def _run_command(self, *args):
out = StringIO()
call_command("backfill_cmmc_audit", *args, stdout=out)
return out.getvalue()
def test_dry_run_creates_no_rows(self):
self._log_entry(ADDITION, self.user.pk, self.user.username)
output = self._run_command("--dry-run")
self.assertIn("Mode: dry-run", output)
self.assertEqual(AuditEvent.objects.count(), 0)
self.assertEqual(BackfillRun.objects.count(), 0)
def test_commit_creates_reconstructed_rows_from_admin_log(self):
log_entry = self._log_entry(ADDITION, self.user.pk, self.user.username)
self._run_command("--commit")
event = AuditEvent.objects.get()
self.assertEqual(event.action, AuditEvent.Action.ADMIN_ADDITION)
self.assertEqual(event.source, AuditEvent.Source.DJANGO_ADMIN_LOG)
self.assertEqual(event.source_id, str(log_entry.pk))
self.assertEqual(event.event_time, log_entry.action_time)
self.assertTrue(event.is_reconstructed)
self.assertEqual(BackfillRun.objects.get(name=RUN_NAME).created_events, 1)
def test_second_commit_without_force_does_not_rerun(self):
self._log_entry(ADDITION, self.user.pk, self.user.username)
self._run_command("--commit")
output = self._run_command("--commit")
self.assertIn("use --force to rerun", output)
self.assertEqual(AuditEvent.objects.count(), 1)
self.assertEqual(BackfillRun.objects.count(), 1)
def test_force_rerun_does_not_duplicate_rows(self):
self._log_entry(ADDITION, self.user.pk, self.user.username)
self._run_command("--commit")
output = self._run_command("--commit", "--force")
self.assertIn("Duplicates skipped: 1", output)
self.assertEqual(AuditEvent.objects.count(), 1)
self.assertEqual(BackfillRun.objects.count(), 1)
def test_admin_add_change_delete_mapping(self):
target = get_user_model().objects.create_user(username="mapped_user")
self._log_entry(ADDITION, target.pk, target.username)
self._log_entry(CHANGE, target.pk, target.username, '[{"changed": {"fields": ["Email"]}}]')
self._log_entry(DELETION, target.pk, target.username)
self._run_command("--commit")
self.assertEqual(
set(AuditEvent.objects.values_list("action", flat=True)),
{
AuditEvent.Action.ADMIN_ADDITION,
AuditEvent.Action.ADMIN_CHANGE,
AuditEvent.Action.ADMIN_DELETION,
},
)
def test_duplicate_source_id_action_is_skipped(self):
log_entry = self._log_entry(CHANGE, self.user.pk, self.user.username)
AuditEvent.objects.create(
action=AuditEvent.Action.ADMIN_CHANGE,
actor=self.user,
target_type="auth.user",
target_id=str(self.user.pk),
target_repr=self.user.username,
source=AuditEvent.Source.DJANGO_ADMIN_LOG,
source_id=str(log_entry.pk),
event_time=timezone.now(),
is_reconstructed=True,
)
output = self._run_command("--dry-run")
self.assertIn("Duplicates skipped: 1", output)
self.assertEqual(AuditEvent.objects.count(), 1)
def test_project_deletion_from_admin_log_backfills_project_deleted(self):
project_content_type = ContentType.objects.get_for_model(Project)
self._log_entry(
DELETION,
2016540,
"Quantum Materials Access Review",
"",
content_type=project_content_type,
)
self._run_command("--commit")
event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_DELETED)
self.assertEqual(event.target_type, "project.project")
self.assertEqual(event.target_id, "2016540")
self.assertEqual(
event.target_repr,
"Deleted project: Quantum Materials Access Review (id 2016540)",
)
self.assertEqual(event.target_summary, event.target_repr)
def test_live_user_profile_pi_event_display_uses_user_label(self):
target = get_user_model().objects.create_user(
username="live_pi",
first_name="Live",
last_name="Person",
)
target.userprofile.is_pi = True
target.userprofile.save()
event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED)
self.assertEqual(event.target_type, "auth.user")
self.assertEqual(event.target_id, str(target.pk))
self.assertEqual(event.target_repr, "live_pi (Live Person)")
self.assertIn("PI status changed for live_pi (Live Person): upgraded to PI", event.message)
def test_reconstructed_user_profile_logentry_display_uses_related_user_label(self):
target = get_user_model().objects.create_user(
username="profile_pi",
first_name="Profile",
last_name="Person",
)
profile_content_type = ContentType.objects.get_for_model(UserProfile)
self._log_entry(
CHANGE,
target.userprofile.pk,
f"UserProfile object ({target.userprofile.pk})",
'[{"changed": {"fields": ["Is pi"]}}]',
content_type=profile_content_type,
)
self._run_command("--commit")
event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED)
self.assertEqual(event.target_type, "user.userprofile")
self.assertEqual(event.target_id, str(target.userprofile.pk))
self.assertEqual(event.target_repr, "profile_pi (Profile Person)")
self.assertEqual(
event.message,
"Reconstructed from Django admin log: PI status changed for "
"profile_pi (Profile Person); direction unknown",
)
def test_reconstructed_user_profile_logentry_uses_missing_profile_fallback_label(self):
profile_content_type = ContentType.objects.get_for_model(UserProfile)
self._log_entry(
CHANGE,
999999,
"UserProfile object (999999)",
'[{"changed": {"fields": ["Is pi"]}}]',
content_type=profile_content_type,
)
self._run_command("--commit")
event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED)
self.assertEqual(event.target_repr, "UserProfile id 999999 (user not found)")
self.assertEqual(
event.message,
"Reconstructed from Django admin log: PI status changed for "
"UserProfile id 999999 (user not found); direction unknown",
)
def test_one_resource_allocation_label(self):
project = self._project(title="Quantum Materials Access Review")
allocation = self._allocation(project=project)
self._resource("CARC Project Scratch", allocation=allocation)
label = allocation_label(allocation)
self.assertEqual(label, f"CARC Project Scratch allocation for Project: {project.title}")
def test_storage_allocation_label(self):
project = self._project(title="Viral sequencing")
allocation = self._allocation(project=project)
self._resource("CARC Project Storage", allocation=allocation)
label = allocation_label(allocation)
self.assertEqual(label, f"CARC Project Storage allocation for Project: {project.title}")
def test_allocation_label_omits_resource_when_missing(self):
project = self._project(title="Quantum Materials Access Review")
allocation = self._allocation(project=project)
label = allocation_label(allocation)
self.assertEqual(label, f"Allocation for Project: {project.title}")
def test_allocation_target_summary_uses_label_without_allocation_id(self):
project = self._project(title="Quantum Materials Access Review")
allocation = self._allocation(project=project)
self._resource("CARC Project Scratch", allocation=allocation)
event = AuditEvent.objects.create(
action=AuditEvent.Action.ALLOCATION_STATUS_CHANGED,
actor=self.user,
target_type="allocation.allocation",
target_id=str(allocation.pk),
target_repr=f"Allocation {allocation.pk}, stale label",
source=AuditEvent.Source.COLDFRONT_HISTORY,
event_time=timezone.now(),
)
self.assertEqual(event.target_summary, f"CARC Project Scratch allocation for Project: {project.title}")
self.assertNotIn(str(allocation.pk), event.target_summary)
def test_historical_allocation_label_resolves_project_id_without_allocation_id(self):
project = self._project(title="Quantum Materials Access Review")
history = SimpleNamespace(id=1471, project_id=project.pk)
label = _allocation_repr(history)
self.assertEqual(label, f"Allocation for Project: {project.title}")
def test_historical_allocation_label_marks_missing_project_id_with_resource_name(self):
history = SimpleNamespace(id=1471, project_id=2016541, resource_names=["CARC Project Scratch"])
label = _allocation_repr(history)
self.assertEqual(label, "CARC Project Scratch allocation for Project id 2016541 (not found)")
def test_historical_allocation_label_marks_missing_project_id(self):
history = SimpleNamespace(id=1471, project_id=2016541)
label = _allocation_repr(history)
self.assertEqual(label, "Allocation for Project id 2016541 (not found)")
def test_historical_project_user_label_resolves_project_id(self):
project = self._project(title="Quantum Materials Access Review")
history = SimpleNamespace(id=2001, project_id=project.pk, user_id=self.user.pk)
label = _project_user_repr(history)
self.assertIn(f"on Project: {project.title}", label)
self.assertIn(f"{self.user.username} (id {self.user.pk})", label)
def test_historical_project_user_label_marks_missing_project_id(self):
history = SimpleNamespace(id=2001, project_id=2016541, user_id=self.user.pk)
label = _project_user_repr(history)
self.assertIn("Project id 2016541 (not found)", label)
self.assertIn(f"{self.user.username} (id {self.user.pk})", label)
def _project(self, title):
status = ProjectStatusChoice.objects.create(name=f"Active {title}")
field_of_science = FieldOfScience.objects.create(description=f"FOS {title}")
return Project.objects.create(
title=title,
pi=self.user,
description="A sufficiently detailed project description.",
field_of_science=field_of_science,
status=status,
)
def _allocation(self, project):
status = AllocationStatusChoice.objects.create(name=f"Active {project.title}")
return Allocation.objects.create(
project=project,
status=status,
justification="Testing audit allocation labels.",
)
def _resource(self, name, allocation):
resource_type, _ = ResourceType.objects.get_or_create(
name="Storage",
defaults={"description": "Storage resource type"},
)
resource = Resource.objects.create(
name=name,
description="Testing audit allocation labels.",
resource_type=resource_type,
)
allocation.resources.add(resource)
return resource
class AuditEventProjectDisplayTests(TestCase):
def test_missing_project_id_display_is_explicit(self):
event = AuditEvent.objects.create(
action=AuditEvent.Action.PROJECT_DELETED,
target_type="project.project",
target_id="2016540",
event_time=timezone.now(),
)
self.assertEqual(event.target_summary, "Project id 2016540 (not found)")
def test_missing_deleted_project_uses_stored_target_repr(self):
event = AuditEvent.objects.create(
action=AuditEvent.Action.PROJECT_DELETED,
target_type="project.project",
target_id="2016540",
target_repr="Deleted project: Quantum Materials Access Review (id 2016540)",
event_time=timezone.now(),
)
self.assertEqual(event.target_summary, "Deleted project: Quantum Materials Access Review (id 2016540)")
def test_project_archive_status_transition_creates_readable_event(self):
user = get_user_model().objects.create_user(username="project_archive_pi")
project = self._project(user, "Quantum Materials Access Review", "New")
archived = ProjectStatusChoice.objects.create(name="Archived")
AuditEvent.objects.all().delete()
project.status = archived
project.save()
event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_ARCHIVED)
self.assertEqual(event.target_repr, f"Project: {project.title}")
self.assertEqual(event.old_values["status"], "New")
self.assertEqual(event.new_values["status"], "Archived")
self.assertEqual(
event.message,
f"Project status changed for Project: {project.title}: New -> Archived",
)
def test_project_pi_change_creates_readable_event(self):
old_pi = get_user_model().objects.create_user(username="old_project_pi")
new_pi = get_user_model().objects.create_user(username="new_project_pi")
project = self._project(old_pi, "Quantum Materials Access Review", "Active")
AuditEvent.objects.all().delete()
project.pi = new_pi
project.save()
event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_PI_CHANGED)
self.assertEqual(event.target_repr, f"Project: {project.title}")
self.assertEqual(event.old_values["pi_id"], old_pi.pk)
self.assertEqual(event.new_values["pi_id"], new_pi.pk)
self.assertEqual(event.message, f"Project PI changed for Project: {project.title}")
def _project(self, user, title, status_name):
status = ProjectStatusChoice.objects.create(name=f"{status_name} {title}")
status.name = status_name
status.save(update_fields=["name"])
field_of_science = FieldOfScience.objects.create(description=f"FOS {title} {status_name}")
return Project.objects.create(
title=title,
pi=user,
description="A sufficiently detailed project description.",
field_of_science=field_of_science,
status=status,
)
class AuditEventAllocationDisplayTests(TestCase):
def test_new_to_active_reads_as_approved(self):
event = self._event(
AuditEvent.Action.ALLOCATION_STATUS_CHANGED,
old_values={"status": "New"},
new_values={"status": "Active"},
)
self.assertEqual(event.transition_display, "New \u2192 Active")
self.assertEqual(event.action_detail_display, "Allocation approved (New \u2192 Active)")
def test_active_to_revoked_reads_as_revoked(self):
event = self._event(
AuditEvent.Action.ALLOCATION_DISABLED,
old_values={"status": "Active"},
new_values={"status": "Revoked"},
)
self.assertEqual(event.transition_display, "Active \u2192 Revoked")
self.assertEqual(event.action_detail_display, "Allocation revoked (Active \u2192 Revoked)")
def test_missing_old_new_status_falls_back_to_action_label(self):
event = self._event(AuditEvent.Action.ALLOCATION_STATUS_CHANGED)
self.assertEqual(event.transition_display, "")
self.assertEqual(event.action_detail_display, "Allocation status changed")
def test_reconstructed_row_uses_status_transition(self):
event = self._event(
AuditEvent.Action.ALLOCATION_STATUS_CHANGED,
old_values={"status": "Pending"},
new_values={"status": "Active"},
source=AuditEvent.Source.COLDFRONT_HISTORY,
is_reconstructed=True,
)
self.assertEqual(event.transition_display, "Pending \u2192 Active")
self.assertEqual(event.action_detail_display, "Allocation status changed (Pending \u2192 Active)")
def test_live_row_uses_status_transition(self):
event = self._event(
AuditEvent.Action.ALLOCATION_DISABLED,
old_values={"status": "New"},
new_values={"status": "Denied"},
source=AuditEvent.Source.COLDFRONT_SIGNAL,
is_reconstructed=False,
)
self.assertEqual(event.transition_display, "New \u2192 Denied")
self.assertEqual(event.action_detail_display, "Allocation disabled (New \u2192 Denied)")
def test_allocation_renewed_uses_extension_days(self):
event = self._event(
AuditEvent.Action.ALLOCATION_RENEWED,
new_values={"end_date_extension": 365},
)
self.assertEqual(event.transition_display, "+365 days")
self.assertEqual(event.action_detail_display, "Allocation renewed (+365 days)")
def _event(self, action, **kwargs):
defaults = {
"target_type": "allocation.allocation",
"target_id": "1",
"target_repr": "Allocation for Project: Example",
"event_time": timezone.now(),
"source": AuditEvent.Source.RUNTIME,
}
defaults.update(kwargs)
return AuditEvent.objects.create(action=action, **defaults)
class AuditEventEvidenceDisplayTests(TestCase):
def test_ac_au_coldfront_history_display(self):
event = self._event(
control_family="AC,AU",
evidence_category=AuditEvent.EvidenceCategory.AC_AU,
source=AuditEvent.Source.COLDFRONT_HISTORY,
)
self.assertEqual(event.evidence_area_display, "Access Control + Audit Logging")
self.assertEqual(event.source_display, "ColdFront history table")
self.assertEqual(event.evidence_display, "Access Control + Audit Logging\nSource: ColdFront history table")
self.assertIn("Access Control + Audit Logging", self._admin_evidence_text(event))
self.assertIn("Source: ColdFront history table", self._admin_evidence_text(event))
def test_cm_django_admin_log_display(self):
event = self._event(
control_family="CM",
evidence_category=AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT,
source=AuditEvent.Source.DJANGO_ADMIN_LOG,
)
self.assertEqual(event.evidence_area_display, "Configuration Management")
self.assertEqual(event.source_display, "Django admin log")
self.assertEqual(event.evidence_display, "Configuration Management\nSource: Django admin log")
self.assertIn("Configuration Management", self._admin_evidence_text(event))
self.assertIn("Source: Django admin log", self._admin_evidence_text(event))
def test_unknown_source_falls_back_clearly(self):
event = self._event(
control_family="AC",
evidence_category=AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
source="unexpected_source",
)
self.assertEqual(event.source_display, "Unknown source (unexpected_source)")
self.assertIn("Unknown source (unexpected_source)", self._admin_evidence_text(event))
def _event(self, **kwargs):
defaults = {
"action": AuditEvent.Action.ADMIN_CHANGE,
"target_type": "auth.user",
"target_id": "1",
"target_repr": "Example user",
"event_time": timezone.now(),
}
defaults.update(kwargs)
return AuditEvent.objects.create(**defaults)
def _admin_evidence_text(self, event):
from .admin import AuditEventAdmin
admin_instance = AuditEventAdmin(AuditEvent, None)
return strip_tags(str(admin_instance.evidence_display(event)))
class ActorTypeFilterTests(TestCase):
def setUp(self):
self.user = get_user_model().objects.create_user(username="named_actor")
self.named_event = self._event(actor=self.user)
self.programmatic_event = self._event(actor=None)
self.reconstructed_programmatic_event = self._event(
actor=None,
is_reconstructed=True,
source=AuditEvent.Source.COLDFRONT_HISTORY,
)
def test_named_actor_filter_returns_only_rows_with_actor(self):
queryset = self._filter("named")
self.assertEqual(list(queryset), [self.named_event])
def test_programmatic_filter_returns_rows_without_actor(self):
queryset = self._filter("programmatic")
self.assertEqual(
set(queryset),
{self.programmatic_event, self.reconstructed_programmatic_event},
)
def test_programmatic_actor_display_is_unchanged(self):
self.assertEqual(self.programmatic_event.actor_display, "programmatic")
self.assertEqual(self.reconstructed_programmatic_event.actor_display, "programmatic")
def _filter(self, value):
from .admin import ActorTypeFilter, AuditEventAdmin
model_admin = AuditEventAdmin(AuditEvent, None)
filter_instance = ActorTypeFilter(None, {"actor_type": value}, AuditEvent, model_admin)
filter_instance.used_parameters = {"actor_type": value}
return filter_instance.queryset(None, AuditEvent.objects.order_by("pk"))
def _event(self, **kwargs):
defaults = {
"action": AuditEvent.Action.ADMIN_CHANGE,
"target_type": "auth.user",
"target_id": "1",
"target_repr": "Example user",
"event_time": timezone.now(),
}
defaults.update(kwargs)
return AuditEvent.objects.create(**defaults)
class AuditEventAdminListDisplayTests(TestCase):
def test_transition_column_is_not_in_list_display(self):
from .admin import AuditEventAdmin
self.assertIn("action_detail_display", AuditEventAdmin.list_display)
self.assertNotIn("transition_display", AuditEventAdmin.list_display)