CMMC-Audit/utils.py
2026-05-27 14:45:29 -06:00

196 lines
5.9 KiB
Python

from .middleware import get_current_actor, get_current_request
from .models import AuditEvent
from .resolvers import user_profile_label
ACTION_EVIDENCE_MAP = {
AuditEvent.Action.ADMIN_ADDITION: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.ADMIN_CHANGE: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.ADMIN_DELETION: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.USER_PI_UPGRADED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.PI_STATUS_REVOKED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.USER_ADMIN_PRIVILEGES_CHANGED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.ALLOCATION_REQUESTED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.ALLOCATION_CHANGE_REQUESTED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.ALLOCATION_STATUS_CHANGED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.ALLOCATION_DISABLED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.RENEWAL_REQUESTED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.RENEWAL_APPROVED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.ALLOCATION_RENEWED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.PROJECT_USER_ADDED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.PROJECT_USER_REMOVED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.PROJECT_USER_ROLE_CHANGED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.PROJECT_CREATED: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.PROJECT_ARCHIVED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.PROJECT_DELETED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.PROJECT_STATUS_CHANGED: (
AuditEvent.EvidenceCategory.AC_AU,
"AC,AU",
),
AuditEvent.Action.PROJECT_PI_CHANGED: (
AuditEvent.EvidenceCategory.AC_ACCESS_CONTROL,
"AC",
),
AuditEvent.Action.PROJECT_REVIEW_FORCED: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.PROJECT_REVIEW_SUBMITTED: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.PROJECT_REVIEW_COMPLETED: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.PROJECT_REVIEW_STATUS_CHANGED: (
AuditEvent.EvidenceCategory.AU_AUDIT_ACCOUNTABILITY,
"AU",
),
AuditEvent.Action.RESOURCE_CREATED: (
AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT,
"CM",
),
AuditEvent.Action.RESOURCE_CHANGED: (
AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT,
"CM",
),
AuditEvent.Action.RESOURCE_DELETED: (
AuditEvent.EvidenceCategory.CM_CONFIGURATION_MANAGEMENT,
"CM",
),
}
def _get_request_ip(request):
forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR")
if forwarded_for:
return forwarded_for.split(",")[0].strip() or None
return request.META.get("REMOTE_ADDR") or None
def _get_target_repr(target):
if target._meta.label_lower == "user.userprofile":
return user_profile_label(target)[:255]
try:
return str(target)[:255]
except Exception:
meta = target._meta
pk = getattr(target, "pk", "")
return f"{meta.app_label}.{meta.model_name} object ({pk})"[:255]
def log_event(
action,
target,
old_values=None,
new_values=None,
message="",
actor=None,
request=None,
evidence_category="",
control_family="",
source=AuditEvent.Source.RUNTIME,
source_id="",
is_reconstructed=False,
event_time=None,
target_repr="",
):
if target is None or isinstance(target, AuditEvent):
return None
request = request if request is not None else get_current_request()
actor = actor if actor is not None else get_current_actor()
if actor is not None and getattr(actor, "is_authenticated", False) is False:
actor = None
if not evidence_category or not control_family:
default_evidence_category, default_control_family = ACTION_EVIDENCE_MAP.get(action, ("", ""))
evidence_category = evidence_category or default_evidence_category
control_family = control_family or default_control_family
meta = target._meta
request_path = ""
ip_address = None
if request is not None:
request_path = getattr(request, "path", "")[:255]
ip_address = _get_request_ip(request)
values = {
"actor": actor,
"action": action,
"evidence_category": evidence_category,
"control_family": control_family,
"target_type": f"{meta.app_label}.{meta.model_name}",
"target_id": str(getattr(target, "pk", "") or "")[:128],
"target_repr": (target_repr or _get_target_repr(target))[:255],
"old_values": old_values or {},
"new_values": new_values or {},
"message": str(message or ""),
"source": source,
"source_id": str(source_id or "")[:128],
"is_reconstructed": is_reconstructed,
"request_path": request_path,
"ip_address": ip_address,
}
if event_time is not None:
values["event_time"] = event_time
return AuditEvent.objects.create(**values)