from .middleware import get_current_actor, get_current_request from .models import AuditEvent from .resolvers import user_profile_label ACTION_EVIDENCE_MAP = { AuditEvent.Action.ADMIN_ADDITION: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.ADMIN_CHANGE: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.ADMIN_DELETION: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.USER_PI_UPGRADED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.PI_STATUS_REVOKED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.ALLOCATION_REQUESTED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.ALLOCATION_STATUS_CHANGED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.ALLOCATION_DISABLED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.RENEWAL_REQUESTED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.RENEWAL_APPROVED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.ALLOCATION_RENEWED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.PROJECT_USER_ADDED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.PROJECT_USER_REMOVED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.PROJECT_USER_ROLE_CHANGED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.PROJECT_CREATED: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.PROJECT_ARCHIVED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.PROJECT_DELETED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.PROJECT_STATUS_CHANGED: ( AuditEvent.EvidenceCategory.AC_AU, "AC,AU", ), AuditEvent.Action.PROJECT_PI_CHANGED: ( AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL, "AC", ), AuditEvent.Action.PROJECT_REVIEW_FORCED: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.PROJECT_REVIEW_SUBMITTED: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.PROJECT_REVIEW_COMPLETED: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED: ( AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY, "AU", ), AuditEvent.Action.RESOURCE_CREATED: ( AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, "CM", ), AuditEvent.Action.RESOURCE_CHANGED: ( AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, "CM", ), AuditEvent.Action.RESOURCE_DELETED: ( AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT, "CM", ), } def _get_request_ip(request): forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR") if forwarded_for: return forwarded_for.split(",")[0].strip() or None return request.META.get("REMOTE_ADDR") or None def _get_target_repr(target): if target._meta.label_lower == "user.userprofile": return user_profile_label(target)[:255] try: return str(target)[:255] except Exception: meta = target._meta pk = getattr(target, "pk", "") return f"{meta.app_label}.{meta.model_name} object ({pk})"[:255] def log_event( action, target, old_values=None, new_values=None, message="", actor=None, request=None, evidence_category="", control_family="", source=AuditEvent.Source.RUNTIME, source_id="", is_reconstructed=False, event_time=None, target_repr="", ): if target is None or isinstance(target, AuditEvent): return None request = request if request is not None else get_current_request() actor = actor if actor is not None else get_current_actor() if actor is not None and getattr(actor, "is_authenticated", False) is False: actor = None if not evidence_category or not control_family: default_evidence_category, default_control_family = ACTION_EVIDENCE_MAP.get(action, ("", "")) evidence_category = evidence_category or default_evidence_category control_family = control_family or default_control_family meta = target._meta request_path = "" ip_address = None if request is not None: request_path = getattr(request, "path", "")[:255] ip_address = _get_request_ip(request) values = { "actor": actor, "action": action, "evidence_category": evidence_category, "control_family": control_family, "target_type": f"{meta.app_label}.{meta.model_name}", "target_id": str(getattr(target, "pk", "") or "")[:128], "target_repr": (target_repr or _get_target_repr(target))[:255], "old_values": old_values or {}, "new_values": new_values or {}, "message": str(message or ""), "source": source, "source_id": str(source_id or "")[:128], "is_reconstructed": is_reconstructed, "request_path": request_path, "ip_address": ip_address, } if event_time is not None: values["event_time"] = event_time return AuditEvent.objects.create(**values)