from io import StringIO from types import SimpleNamespace from django.contrib.admin.models import ADDITION, CHANGE, DELETION, LogEntry from django.contrib.auth import get_user_model from django.contrib.contenttypes.models import ContentType from django.core.management import call_command from django.test import TestCase from django.utils.html import strip_tags from django.utils import timezone from coldfront.core.allocation.models import Allocation, AllocationStatusChoice from coldfront.core.field_of_science.models import FieldOfScience from coldfront.core.project.models import Project, ProjectStatusChoice from coldfront.core.resource.models import Resource, ResourceType from coldfront.core.user.models import UserProfile from .backfill import RUN_NAME, _allocation_repr, _project_user_repr from .models import AuditEvent, BackfillRun from .resolvers import allocation_label class BackfillCmmcAuditCommandTests(TestCase): def setUp(self): self.user = get_user_model().objects.create_user( username="audit_backfill_admin", email="audit_backfill_admin@example.test", password="unused", is_staff=True, ) self.content_type = ContentType.objects.get_for_model(get_user_model()) def _log_entry(self, action_flag, object_id, object_repr, change_message="", content_type=None): return LogEntry.objects.create( user=self.user, content_type=content_type or self.content_type, object_id=str(object_id), object_repr=object_repr, action_flag=action_flag, change_message=change_message, ) def _run_command(self, *args): out = StringIO() call_command("backfill_cmmc_audit", *args, stdout=out) return out.getvalue() def test_dry_run_creates_no_rows(self): self._log_entry(ADDITION, self.user.pk, self.user.username) output = self._run_command("--dry-run") self.assertIn("Mode: dry-run", output) self.assertEqual(AuditEvent.objects.count(), 0) self.assertEqual(BackfillRun.objects.count(), 0) def test_commit_creates_reconstructed_rows_from_admin_log(self): log_entry = self._log_entry(ADDITION, self.user.pk, self.user.username) self._run_command("--commit") event = AuditEvent.objects.get() self.assertEqual(event.action, AuditEvent.Action.ADMIN_ADDITION) self.assertEqual(event.source, AuditEvent.Source.DJANGO_ADMIN_LOG) self.assertEqual(event.source_id, str(log_entry.pk)) self.assertEqual(event.event_time, log_entry.action_time) self.assertTrue(event.is_reconstructed) self.assertEqual(BackfillRun.objects.get(name=RUN_NAME).created_events, 1) def test_second_commit_without_force_does_not_rerun(self): self._log_entry(ADDITION, self.user.pk, self.user.username) self._run_command("--commit") output = self._run_command("--commit") self.assertIn("use --force to rerun", output) self.assertEqual(AuditEvent.objects.count(), 1) self.assertEqual(BackfillRun.objects.count(), 1) def test_force_rerun_does_not_duplicate_rows(self): self._log_entry(ADDITION, self.user.pk, self.user.username) self._run_command("--commit") output = self._run_command("--commit", "--force") self.assertIn("Duplicates skipped: 1", output) self.assertEqual(AuditEvent.objects.count(), 1) self.assertEqual(BackfillRun.objects.count(), 1) def test_admin_add_change_delete_mapping(self): target = get_user_model().objects.create_user(username="mapped_user") self._log_entry(ADDITION, target.pk, target.username) self._log_entry(CHANGE, target.pk, target.username, '[{"changed": {"fields": ["Email"]}}]') self._log_entry(DELETION, target.pk, target.username) self._run_command("--commit") self.assertEqual( set(AuditEvent.objects.values_list("action", flat=True)), { AuditEvent.Action.ADMIN_ADDITION, AuditEvent.Action.ADMIN_CHANGE, AuditEvent.Action.ADMIN_DELETION, }, ) def test_duplicate_source_id_action_is_skipped(self): log_entry = self._log_entry(CHANGE, self.user.pk, self.user.username) AuditEvent.objects.create( action=AuditEvent.Action.ADMIN_CHANGE, actor=self.user, target_type="auth.user", target_id=str(self.user.pk), target_repr=self.user.username, source=AuditEvent.Source.DJANGO_ADMIN_LOG, source_id=str(log_entry.pk), event_time=timezone.now(), is_reconstructed=True, ) output = self._run_command("--dry-run") self.assertIn("Duplicates skipped: 1", output) self.assertEqual(AuditEvent.objects.count(), 1) def test_project_deletion_from_admin_log_backfills_project_deleted(self): project_content_type = ContentType.objects.get_for_model(Project) self._log_entry( DELETION, 2016540, "Quantum Materials Access Review", "", content_type=project_content_type, ) self._run_command("--commit") event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_DELETED) self.assertEqual(event.target_type, "project.project") self.assertEqual(event.target_id, "2016540") self.assertEqual( event.target_repr, "Deleted project: Quantum Materials Access Review (id 2016540)", ) self.assertEqual(event.target_summary, event.target_repr) def test_live_user_profile_pi_event_display_uses_user_label(self): target = get_user_model().objects.create_user( username="live_pi", first_name="Live", last_name="Person", ) target.userprofile.is_pi = True target.userprofile.save() event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) self.assertEqual(event.target_type, "auth.user") self.assertEqual(event.target_id, str(target.pk)) self.assertEqual(event.target_repr, "live_pi (Live Person)") self.assertIn("PI status changed for live_pi (Live Person): upgraded to PI", event.message) def test_reconstructed_user_profile_logentry_display_uses_related_user_label(self): target = get_user_model().objects.create_user( username="profile_pi", first_name="Profile", last_name="Person", ) profile_content_type = ContentType.objects.get_for_model(UserProfile) self._log_entry( CHANGE, target.userprofile.pk, f"UserProfile object ({target.userprofile.pk})", '[{"changed": {"fields": ["Is pi"]}}]', content_type=profile_content_type, ) self._run_command("--commit") event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) self.assertEqual(event.target_type, "user.userprofile") self.assertEqual(event.target_id, str(target.userprofile.pk)) self.assertEqual(event.target_repr, "profile_pi (Profile Person)") self.assertEqual( event.message, "Reconstructed from Django admin log: PI status changed for " "profile_pi (Profile Person); direction unknown", ) def test_reconstructed_user_profile_logentry_uses_missing_profile_fallback_label(self): profile_content_type = ContentType.objects.get_for_model(UserProfile) self._log_entry( CHANGE, 999999, "UserProfile object (999999)", '[{"changed": {"fields": ["Is pi"]}}]', content_type=profile_content_type, ) self._run_command("--commit") event = AuditEvent.objects.get(action=AuditEvent.Action.USER_PI_UPGRADED) self.assertEqual(event.target_repr, "UserProfile id 999999 (user not found)") self.assertEqual( event.message, "Reconstructed from Django admin log: PI status changed for " "UserProfile id 999999 (user not found); direction unknown", ) def test_one_resource_allocation_label(self): project = self._project(title="Quantum Materials Access Review") allocation = self._allocation(project=project) self._resource("CARC Project Scratch", allocation=allocation) label = allocation_label(allocation) self.assertEqual(label, f"CARC Project Scratch allocation for Project: {project.title}") def test_storage_allocation_label(self): project = self._project(title="Viral sequencing") allocation = self._allocation(project=project) self._resource("CARC Project Storage", allocation=allocation) label = allocation_label(allocation) self.assertEqual(label, f"CARC Project Storage allocation for Project: {project.title}") def test_allocation_label_omits_resource_when_missing(self): project = self._project(title="Quantum Materials Access Review") allocation = self._allocation(project=project) label = allocation_label(allocation) self.assertEqual(label, f"Allocation for Project: {project.title}") def test_allocation_target_summary_uses_label_without_allocation_id(self): project = self._project(title="Quantum Materials Access Review") allocation = self._allocation(project=project) self._resource("CARC Project Scratch", allocation=allocation) event = AuditEvent.objects.create( action=AuditEvent.Action.ALLOCATION_STATUS_CHANGED, actor=self.user, target_type="allocation.allocation", target_id=str(allocation.pk), target_repr=f"Allocation {allocation.pk}, stale label", source=AuditEvent.Source.COLDFRONT_HISTORY, event_time=timezone.now(), ) self.assertEqual(event.target_summary, f"CARC Project Scratch allocation for Project: {project.title}") self.assertNotIn(str(allocation.pk), event.target_summary) def test_historical_allocation_label_resolves_project_id_without_allocation_id(self): project = self._project(title="Quantum Materials Access Review") history = SimpleNamespace(id=1471, project_id=project.pk) label = _allocation_repr(history) self.assertEqual(label, f"Allocation for Project: {project.title}") def test_historical_allocation_label_marks_missing_project_id_with_resource_name(self): history = SimpleNamespace(id=1471, project_id=2016541, resource_names=["CARC Project Scratch"]) label = _allocation_repr(history) self.assertEqual(label, "CARC Project Scratch allocation for Project id 2016541 (not found)") def test_historical_allocation_label_marks_missing_project_id(self): history = SimpleNamespace(id=1471, project_id=2016541) label = _allocation_repr(history) self.assertEqual(label, "Allocation for Project id 2016541 (not found)") def test_historical_project_user_label_resolves_project_id(self): project = self._project(title="Quantum Materials Access Review") history = SimpleNamespace(id=2001, project_id=project.pk, user_id=self.user.pk) label = _project_user_repr(history) self.assertIn(f"on Project: {project.title}", label) self.assertIn(f"{self.user.username} (id {self.user.pk})", label) def test_historical_project_user_label_marks_missing_project_id(self): history = SimpleNamespace(id=2001, project_id=2016541, user_id=self.user.pk) label = _project_user_repr(history) self.assertIn("Project id 2016541 (not found)", label) self.assertIn(f"{self.user.username} (id {self.user.pk})", label) def _project(self, title): status = ProjectStatusChoice.objects.create(name=f"Active {title}") field_of_science = FieldOfScience.objects.create(description=f"FOS {title}") return Project.objects.create( title=title, pi=self.user, description="A sufficiently detailed project description.", field_of_science=field_of_science, status=status, ) def _allocation(self, project): status = AllocationStatusChoice.objects.create(name=f"Active {project.title}") return Allocation.objects.create( project=project, status=status, justification="Testing audit allocation labels.", ) def _resource(self, name, allocation): resource_type, _ = ResourceType.objects.get_or_create( name="Storage", defaults={"description": "Storage resource type"}, ) resource = Resource.objects.create( name=name, description="Testing audit allocation labels.", resource_type=resource_type, ) allocation.resources.add(resource) return resource class AuditEventProjectDisplayTests(TestCase): def test_missing_project_id_display_is_explicit(self): event = AuditEvent.objects.create( action=AuditEvent.Action.PROJECT_DELETED, target_type="project.project", target_id="2016540", event_time=timezone.now(), ) self.assertEqual(event.target_summary, "Project id 2016540 (not found)") def test_missing_deleted_project_uses_stored_target_repr(self): event = AuditEvent.objects.create( action=AuditEvent.Action.PROJECT_DELETED, target_type="project.project", target_id="2016540", target_repr="Deleted project: Quantum Materials Access Review (id 2016540)", event_time=timezone.now(), ) self.assertEqual(event.target_summary, "Deleted project: Quantum Materials Access Review (id 2016540)") def test_project_archive_status_transition_creates_readable_event(self): user = get_user_model().objects.create_user(username="project_archive_pi") project = self._project(user, "Quantum Materials Access Review", "New") archived = ProjectStatusChoice.objects.create(name="Archived") AuditEvent.objects.all().delete() project.status = archived project.save() event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_ARCHIVED) self.assertEqual(event.target_repr, f"Project: {project.title}") self.assertEqual(event.old_values["status"], "New") self.assertEqual(event.new_values["status"], "Archived") self.assertEqual( event.message, f"Project status changed for Project: {project.title}: New -> Archived", ) def test_project_pi_change_creates_readable_event(self): old_pi = get_user_model().objects.create_user(username="old_project_pi") new_pi = get_user_model().objects.create_user(username="new_project_pi") project = self._project(old_pi, "Quantum Materials Access Review", "Active") AuditEvent.objects.all().delete() project.pi = new_pi project.save() event = AuditEvent.objects.get(action=AuditEvent.Action.PROJECT_PI_CHANGED) self.assertEqual(event.target_repr, f"Project: {project.title}") self.assertEqual(event.old_values["pi_id"], old_pi.pk) self.assertEqual(event.new_values["pi_id"], new_pi.pk) self.assertEqual(event.message, f"Project PI changed for Project: {project.title}") def _project(self, user, title, status_name): status = ProjectStatusChoice.objects.create(name=f"{status_name} {title}") status.name = status_name status.save(update_fields=["name"]) field_of_science = FieldOfScience.objects.create(description=f"FOS {title} {status_name}") return Project.objects.create( title=title, pi=user, description="A sufficiently detailed project description.", field_of_science=field_of_science, status=status, ) class AuditEventAllocationDisplayTests(TestCase): def test_new_to_active_reads_as_approved(self): event = self._event( AuditEvent.Action.ALLOCATION_STATUS_CHANGED, old_values={"status": "New"}, new_values={"status": "Active"}, ) self.assertEqual(event.transition_display, "New \u2192 Active") self.assertEqual(event.action_detail_display, "Allocation approved (New \u2192 Active)") def test_active_to_revoked_reads_as_revoked(self): event = self._event( AuditEvent.Action.ALLOCATION_DISABLED, old_values={"status": "Active"}, new_values={"status": "Revoked"}, ) self.assertEqual(event.transition_display, "Active \u2192 Revoked") self.assertEqual(event.action_detail_display, "Allocation revoked (Active \u2192 Revoked)") def test_missing_old_new_status_falls_back_to_action_label(self): event = self._event(AuditEvent.Action.ALLOCATION_STATUS_CHANGED) self.assertEqual(event.transition_display, "") self.assertEqual(event.action_detail_display, "Allocation status changed") def test_reconstructed_row_uses_status_transition(self): event = self._event( AuditEvent.Action.ALLOCATION_STATUS_CHANGED, old_values={"status": "Pending"}, new_values={"status": "Active"}, source=AuditEvent.Source.COLDFRONT_HISTORY, is_reconstructed=True, ) self.assertEqual(event.transition_display, "Pending \u2192 Active") self.assertEqual(event.action_detail_display, "Allocation status changed (Pending \u2192 Active)") def test_live_row_uses_status_transition(self): event = self._event( AuditEvent.Action.ALLOCATION_DISABLED, old_values={"status": "New"}, new_values={"status": "Denied"}, source=AuditEvent.Source.COLDFRONT_SIGNAL, is_reconstructed=False, ) self.assertEqual(event.transition_display, "New \u2192 Denied") self.assertEqual(event.action_detail_display, "Allocation disabled (New \u2192 Denied)") def test_allocation_renewed_uses_extension_days(self): event = self._event( AuditEvent.Action.ALLOCATION_RENEWED, new_values={"end_date_extension": 365}, ) self.assertEqual(event.transition_display, "+365 days") self.assertEqual(event.action_detail_display, "Allocation renewed (+365 days)") def _event(self, action, **kwargs): defaults = { "target_type": "allocation.allocation", "target_id": "1", "target_repr": "Allocation for Project: Example", "event_time": timezone.now(), "source": AuditEvent.Source.RUNTIME, } defaults.update(kwargs) return AuditEvent.objects.create(action=action, **defaults) class AuditEventEvidenceDisplayTests(TestCase): def test_ac_au_coldfront_history_display(self): event = self._event( control_family="AC,AU", evidence_category=AuditEvent.EvidenceCategory.AC_AU, source=AuditEvent.Source.COLDFRONT_HISTORY, ) self.assertEqual(event.evidence_area_display, "Access Control + Audit Logging") self.assertEqual(event.source_display, "ColdFront history table") self.assertEqual(event.evidence_display, "Access Control + Audit Logging\nSource: ColdFront history table") self.assertIn("Access Control + Audit Logging", self._admin_evidence_text(event)) self.assertIn("Source: ColdFront history table", self._admin_evidence_text(event)) def test_cm_django_admin_log_display(self): event = self._event( control_family="CM", evidence_category=AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, source=AuditEvent.Source.DJANGO_ADMIN_LOG, ) self.assertEqual(event.evidence_area_display, "Configuration Management") self.assertEqual(event.source_display, "Django admin log") self.assertEqual(event.evidence_display, "Configuration Management\nSource: Django admin log") self.assertIn("Configuration Management", self._admin_evidence_text(event)) self.assertIn("Source: Django admin log", self._admin_evidence_text(event)) def test_unknown_source_falls_back_clearly(self): event = self._event( control_family="AC", evidence_category=AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, source="unexpected_source", ) self.assertEqual(event.source_display, "Unknown source (unexpected_source)") self.assertIn("Unknown source (unexpected_source)", self._admin_evidence_text(event)) def _event(self, **kwargs): defaults = { "action": AuditEvent.Action.ADMIN_CHANGE, "target_type": "auth.user", "target_id": "1", "target_repr": "Example user", "event_time": timezone.now(), } defaults.update(kwargs) return AuditEvent.objects.create(**defaults) def _admin_evidence_text(self, event): from .admin import AuditEventAdmin admin_instance = AuditEventAdmin(AuditEvent, None) return strip_tags(str(admin_instance.evidence_display(event))) class ActorTypeFilterTests(TestCase): def setUp(self): self.user = get_user_model().objects.create_user(username="named_actor") self.named_event = self._event(actor=self.user) self.programmatic_event = self._event(actor=None) self.reconstructed_programmatic_event = self._event( actor=None, is_reconstructed=True, source=AuditEvent.Source.COLDFRONT_HISTORY, ) def test_named_actor_filter_returns_only_rows_with_actor(self): queryset = self._filter("named") self.assertEqual(list(queryset), [self.named_event]) def test_programmatic_filter_returns_rows_without_actor(self): queryset = self._filter("programmatic") self.assertEqual( set(queryset), {self.programmatic_event, self.reconstructed_programmatic_event}, ) def test_programmatic_actor_display_is_unchanged(self): self.assertEqual(self.programmatic_event.actor_display, "programmatic") self.assertEqual(self.reconstructed_programmatic_event.actor_display, "programmatic") def _filter(self, value): from .admin import ActorTypeFilter, AuditEventAdmin model_admin = AuditEventAdmin(AuditEvent, None) filter_instance = ActorTypeFilter(None, {"actor_type": value}, AuditEvent, model_admin) filter_instance.used_parameters = {"actor_type": value} return filter_instance.queryset(None, AuditEvent.objects.order_by("pk")) def _event(self, **kwargs): defaults = { "action": AuditEvent.Action.ADMIN_CHANGE, "target_type": "auth.user", "target_id": "1", "target_repr": "Example user", "event_time": timezone.now(), } defaults.update(kwargs) return AuditEvent.objects.create(**defaults) class AuditEventAdminListDisplayTests(TestCase): def test_transition_column_is_not_in_list_display(self): from .admin import AuditEventAdmin self.assertIn("action_detail_display", AuditEventAdmin.list_display) self.assertNotIn("transition_display", AuditEventAdmin.list_display)