Continuous Monitoring Integration Methodology
Integrate red team findings into continuous monitoring systems for ongoing threat detection and defense validation.
Overview
Integrate red team findings into continuous monitoring systems for ongoing threat detection and defense validation. This methodology walkthrough provides a structured, repeatable framework that you can adapt to your organization's specific context and compliance requirements.
Methodology is the backbone of professional red teaming. Without a structured approach, assessments become ad hoc exercises that miss critical vulnerabilities and produce inconsistent results. This walkthrough establishes a framework that ensures comprehensive coverage while remaining adaptable to different engagement types.
Context and Motivation
This methodology addresses a gap identified by CVE-2023-39659 — LangChain arbitrary code execution via json agent in how organizations approach AI security assessments. The framework aligns with Perez et al. 2022 — "Red Teaming Language Models with Language Models" and incorporates lessons learned from real-world engagements.
The AI security landscape evolves rapidly, with new attack techniques and defense mechanisms emerging continuously. A robust methodology must be flexible enough to incorporate new developments while maintaining the rigor needed for professional assessments. This walkthrough provides that balance.
Phase 1: Planning and Scoping
Effective assessment begins with thorough planning. This phase establishes the boundaries, objectives, and success criteria for the engagement.
#!/usr/bin/env python3
"""
Continuous Monitoring Integration Methodology — Methodology Framework
Structured approach for AI red team assessments.
"""
import json
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from enum import Enum
from typing import Optional
class EngagementType(Enum):
FULL_ASSESSMENT = "full_assessment"
FOCUSED_TEST = "focused_test"
COMPLIANCE_AUDIT = "compliance_audit"
CONTINUOUS_MONITORING = "continuous_monitoring"
class Priority(Enum):
CRITICAL = 4
HIGH = 3
MEDIUM = 2
LOW = 1
@dataclass
class EngagementScope:
"""Define the scope and boundaries of an AI red team engagement."""
client: str
engagement_type: EngagementType
start_date: str
end_date: str
target_systems: list = field(default_factory=list)
in_scope_techniques: list = field(default_factory=list)
out_of_scope: list = field(default_factory=list)
objectives: list = field(default_factory=list)
success_criteria: list = field(default_factory=list)
compliance_requirements: list = field(default_factory=list)
contact_info: dict = field(default_factory=dict)
def validate(self) -> list[str]:
"""Validate the engagement scope for completeness."""
issues = []
if not self.target_systems:
issues.append("No target systems defined")
if not self.objectives:
issues.append("No assessment objectives defined")
if not self.success_criteria:
issues.append("No success criteria defined")
if not self.contact_info:
issues.append("No client contact information")
return issues
def to_dict(self) -> dict:
return {
"client": self.client,
"type": self.engagement_type.value,
"dates": f"{self.start_date} to {self.end_date}",
"targets": self.target_systems,
"objectives": self.objectives,
"exclusions": self.out_of_scope,
}
# Example scope definition
scope = EngagementScope(
client="Acme Corp",
engagement_type=EngagementType.FULL_ASSESSMENT,
start_date="2026-03-20",
end_date="2026-04-03",
target_systems=[
"Customer service chatbot (chat.acme.com)",
"Internal knowledge base assistant (kb.internal.acme.com)",
"Code review AI (codereview.acme.com)",
],
in_scope_techniques=[
"Prompt injection (direct and indirect)",
"System prompt extraction",
"Data exfiltration through model outputs",
"Safety bypass and jailbreaking",
"Function calling abuse",
],
out_of_scope=[
"Denial of service attacks",
"Physical infrastructure testing",
"Social engineering of employees",
"Model weight extraction",
],
objectives=[
"Evaluate prompt injection resistance across all target systems",
"Test data exfiltration controls and output filtering",
"Assess function calling security in the code review AI",
"Verify compliance with OWASP LLM Top 10 2025",
],
success_criteria=[
"Complete testing of all in-scope techniques against each target",
"Document all findings with reproducible evidence",
"Deliver final report within 5 business days of testing completion",
],
)
validation_issues = scope.validate()
if validation_issues:
print(f"Scope issues: {validation_issues}")
else:
print("Scope validated successfully")
print(json.dumps(scope.to_dict(), indent=2))Phase 2: Reconnaissance and Analysis
Systematic reconnaissance maps the target's architecture, models, and defenses before testing begins.
@dataclass
class ReconFinding:
"""A single reconnaissance finding."""
category: str
detail: str
confidence: str # high, medium, low
source: str
priority: Priority
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
class ReconFramework:
"""Structured reconnaissance framework for AI systems."""
def __init__(self, scope: EngagementScope):
self.scope = scope
self.findings: list[ReconFinding] = []
def passive_recon(self, target: str) -> list[ReconFinding]:
"""Gather information without directly interacting with the target."""
checklist = [
("Documentation Review", "Review public docs, API references, and changelogs"),
("Technology Stack", "Identify frameworks, cloud providers, and model providers"),
("Public Disclosures", "Check for prior vulnerability disclosures or bug bounties"),
("Job Postings", "Analyze job postings for technology and architecture clues"),
("Social Media", "Search for engineering blog posts and conference talks"),
]
findings = []
for category, description in checklist:
finding = ReconFinding(
category=category,
detail=f"{description} for {target}",
confidence="medium",
source="passive",
priority=Priority.MEDIUM,
)
findings.append(finding)
self.findings.append(finding)
return findings
def active_recon(self, target: str) -> list[ReconFinding]:
"""Actively probe the target to map its configuration."""
probes = [
("Model Identification", "Fingerprint the underlying model through behavioral analysis"),
("Defense Mapping", "Identify input filters, output sanitizers, and guardrails"),
("API Surface", "Map available endpoints, parameters, and error handling"),
("Rate Limiting", "Determine rate limits and throttling behavior"),
("Session Handling", "Analyze session management and context persistence"),
("Tool/Function Discovery", "Enumerate available tools and function calling capabilities"),
]
findings = []
for category, description in probes:
finding = ReconFinding(
category=category,
detail=f"{description} for {target}",
confidence="high",
source="active",
priority=Priority.HIGH,
)
findings.append(finding)
self.findings.append(finding)
return findings
def generate_attack_plan(self) -> dict:
"""Generate a prioritized attack plan based on recon findings."""
return {
"target_count": len(self.scope.target_systems),
"findings_count": len(self.findings),
"high_priority": sum(1 for f in self.findings if f.priority == Priority.HIGH),
"recommended_approach": "Start with highest-priority targets using "
"techniques most likely to succeed based on recon",
}
recon = ReconFramework(scope)
for target in scope.target_systems:
recon.passive_recon(target)
recon.active_recon(target)
plan = recon.generate_attack_plan()
print(json.dumps(plan, indent=2))Phase 3: Execution Framework
Execute testing systematically, documenting all findings as they are discovered. The execution phase is where the majority of engagement time is spent and where the quality of previous phases directly impacts efficiency.
| Phase | Time Allocation | Key Activities | Deliverables |
|---|---|---|---|
| Planning | 10-15% | Scoping, scheduling, environment setup | Engagement plan, scope document |
| Reconnaissance | 15-20% | Passive research, active probing, mapping | Recon report, attack plan |
| Testing | 50-60% | Attack execution, validation, iteration | Raw findings, evidence logs |
| Reporting | 15-20% | Analysis, writing, review, delivery | Final report, executive summary |
class ExecutionTracker:
"""Track execution progress and manage testing workflow."""
def __init__(self, scope: EngagementScope):
self.scope = scope
self.test_cases = []
self.completed = []
self.findings = []
def create_test_matrix(self) -> list[dict]:
"""Generate a test matrix from scope and recon findings."""
techniques = self.scope.in_scope_techniques
targets = self.scope.target_systems
matrix = []
for target in targets:
for technique in techniques:
matrix.append({
"target": target,
"technique": technique,
"priority": Priority.HIGH if "injection" in technique.lower() else Priority.MEDIUM,
"status": "pending",
"assigned_to": None,
"time_estimate_min": 30,
})
self.test_cases = matrix
return matrix
def update_status(self, target: str, technique: str, status: str, finding: dict = None):
"""Update the status of a test case."""
for tc in self.test_cases:
if tc["target"] == target and tc["technique"] == technique:
tc["status"] = status
if finding:
self.findings.append(finding)
break
def get_progress(self) -> dict:
"""Get current execution progress."""
total = len(self.test_cases)
completed = sum(1 for tc in self.test_cases if tc["status"] in ("completed", "blocked"))
in_progress = sum(1 for tc in self.test_cases if tc["status"] == "in_progress")
return {
"total_tests": total,
"completed": completed,
"in_progress": in_progress,
"pending": total - completed - in_progress,
"completion_pct": completed / max(total, 1) * 100,
"findings_count": len(self.findings),
}
tracker = ExecutionTracker(scope)
matrix = tracker.create_test_matrix()
print(f"Generated {len(matrix)} test cases across "
f"{len(scope.target_systems)} targets")
# Simulate execution progress
tracker.update_status(
scope.target_systems[0],
scope.in_scope_techniques[0],
"completed",
{"title": "Example finding", "severity": "High"}
)
print(json.dumps(tracker.get_progress(), indent=2))The execution phase should follow a prioritized order: start with the highest-impact techniques against the most critical targets. If time becomes constrained, this ordering ensures that the most important tests have been completed. Communicate progress regularly with the client through agreed-upon channels, especially if critical findings are discovered that require immediate attention.
Phase 3.5: Finding Validation and Triage
Before moving to reporting, validate all findings to ensure they meet professional quality standards. Unvalidated findings waste client time and damage assessor credibility. Every finding should pass these criteria before inclusion in the final report.
class FindingValidator:
"""Validate and triage findings before reporting."""
SEVERITY_CRITERIA = {
"Critical": {
"min_success_rate": 0.5,
"requires_validation": True,
"impact": "Direct data breach, unauthorized code execution, or safety bypass "
"with real-world harm potential",
},
"High": {
"min_success_rate": 0.3,
"requires_validation": True,
"impact": "System prompt extraction, significant safety bypass, "
"or unauthorized action execution",
},
"Medium": {
"min_success_rate": 0.2,
"requires_validation": True,
"impact": "Information disclosure, partial safety bypass, "
"or limited unauthorized behavior",
},
"Low": {
"min_success_rate": 0.1,
"requires_validation": False,
"impact": "Informational findings, theoretical vulnerabilities, "
"or minimal impact issues",
},
}
def validate_finding(self, finding: dict, test_runs: int = 5) -> dict:
"""Validate a finding meets quality standards."""
issues = []
if not finding.get("title"):
issues.append("Missing title")
if not finding.get("evidence"):
issues.append("Missing evidence")
if not finding.get("remediation"):
issues.append("Missing remediation guidance")
success_rate = finding.get("success_rate", 0)
severity = finding.get("severity", "Medium")
criteria = self.SEVERITY_CRITERIA.get(severity, self.SEVERITY_CRITERIA["Medium"])
if success_rate < criteria["min_success_rate"]:
issues.append(f"Success rate {success_rate:.0%} below minimum "
f"{criteria['min_success_rate']:.0%} for {severity} severity")
return {
"valid": len(issues) == 0,
"issues": issues,
"severity_appropriate": success_rate >= criteria["min_success_rate"],
}
def deduplicate(self, findings: list[dict]) -> list[dict]:
"""Remove duplicate findings that describe the same vulnerability."""
unique = []
seen_techniques = set()
for finding in sorted(findings, key=lambda f: f.get("success_rate", 0), reverse=True):
technique_key = finding.get("technique", "").lower().strip()
if technique_key not in seen_techniques:
unique.append(finding)
seen_techniques.add(technique_key)
else:
# Merge evidence into the existing finding
for existing in unique:
if existing.get("technique", "").lower().strip() == technique_key:
existing.setdefault("additional_evidence", []).append(
finding.get("evidence", "")
)
break
return unique
validator = FindingValidator()Finding validation is a critical quality gate. In the pressure of a time-boxed engagement, it can be tempting to include every observation as a finding. However, flooding the client with low-quality or unvalidated findings dilutes the impact of genuine critical issues and makes the report harder to act on. Apply strict validation criteria and present only findings that meet the quality bar for their assigned severity level.
Deduplication is equally important. Different team members may discover the same vulnerability through different attack paths. These should be consolidated into a single finding with multiple evidence items, rather than presented as separate findings. This provides a clearer picture of the vulnerability and avoids inflating finding counts.
Phase 4: Reporting and Communication
Transform raw findings into actionable reports that drive remediation. The report is the primary deliverable of the engagement and must communicate effectively to multiple audiences.
@dataclass
class Finding:
"""A validated red team finding."""
title: str
severity: str # Critical, High, Medium, Low, Informational
description: str
evidence: str
impact: str
remediation: list[str]
references: list[str]
cvss_score: Optional[float] = None
owasp_mapping: Optional[str] = None
mitre_atlas: Optional[str] = None
reproducible: bool = True
success_rate: float = 0.0
class ReportGenerator:
"""Generate professional red team assessment reports."""
def __init__(self, scope: EngagementScope):
self.scope = scope
self.findings: list[Finding] = []
def add_finding(self, finding: Finding):
self.findings.append(finding)
def generate_executive_summary(self) -> str:
critical = sum(1 for f in self.findings if f.severity == "Critical")
high = sum(1 for f in self.findings if f.severity == "High")
medium = sum(1 for f in self.findings if f.severity == "Medium")
low = sum(1 for f in self.findings if f.severity == "Low")
return f"""
## Executive Summary
Red team assessment of {self.scope.client} AI systems conducted
{self.scope.start_date} through {self.scope.end_date}.
**Overall Risk**: {"Critical" if critical > 0 else "High" if high > 0 else "Medium"}
| Severity | Count |
|----------|-------|
| Critical | {critical} |
| High | {high} |
| Medium | {medium} |
| Low | {low} |
| **Total** | **{len(self.findings)}** |
**Key Findings**: {len(self.findings)} vulnerabilities identified across
{len(self.scope.target_systems)} target systems. Immediate remediation
recommended for all Critical and High severity findings.
"""
def generate_full_report(self) -> str:
sections = [self.generate_executive_summary()]
for i, finding in enumerate(self.findings, 1):
sections.append(f"""
### Finding {i}: {finding.title}
**Severity**: {finding.severity}
**Reproducible**: {"Yes" if finding.reproducible else "No"}
**Success Rate**: {finding.success_rate:.0%}
**Description**: {finding.description}
**Evidence**: {finding.evidence}
**Impact**: {finding.impact}
**Remediation**:
{chr(10).join(f'{j}. {r}' for j, r in enumerate(finding.remediation, 1))}
""")
return "\n".join(sections)
# Example report generation
report = ReportGenerator(scope)
report.add_finding(Finding(
title="System Prompt Extraction via Role-Play",
severity="High",
description="The customer service chatbot revealed its full system prompt "
"when a role-play technique was used to request configuration details.",
evidence="See Appendix A for full request/response logs",
impact="Attackers can understand the full system configuration, enabling "
"targeted attacks against specific safety measures.",
remediation=[
"Implement output filtering for system prompt content",
"Add instruction hierarchy enforcement to resist role-play overrides",
"Deploy monitoring for system prompt extraction attempts",
],
references=["OWASP LLM01", "MITRE ATLAS AML.T0051"],
owasp_mapping="LLM01 — Prompt Injection",
mitre_atlas="AML.T0051",
reproducible=True,
success_rate=0.8,
))
print(report.generate_full_report())Phase 5: Post-Engagement Activities
The engagement does not end with report delivery. Post-engagement activities ensure that findings drive actual security improvements and that the methodology evolves based on lessons learned.
Findings presentation: Schedule a findings walkthrough meeting with both technical and leadership stakeholders. Walk through each finding with live demonstrations where possible. Executives need to understand business risk; engineers need to understand root causes and remediation steps. Prepare different presentation decks for each audience.
Remediation support: Offer to validate remediation implementations. When the client's engineering team deploys fixes, re-test the specific findings to verify that the vulnerabilities have been addressed. This retesting phase is typically shorter than the original engagement and provides valuable assurance that fixes are effective.
Lessons learned: Conduct an internal lessons learned session with the assessment team. Document what worked, what did not, and how the methodology should be adjusted for future engagements. Key questions to address include:
| Question | Purpose |
|---|---|
| Which techniques were most effective against this target type? | Improve technique selection for similar future engagements |
| Were there any scope gaps that missed important attack surfaces? | Refine scoping methodology |
| Did the time allocation match actual effort distribution? | Improve estimation for future engagements |
| Were there any tooling limitations that impacted coverage? | Drive tool development priorities |
| Did the reporting format effectively communicate findings? | Refine report templates |
Knowledge base updates: Update the internal knowledge base with new findings, effective payloads, and defense bypass techniques discovered during the engagement. This institutional knowledge benefits future assessment teams and ensures that the organization continuously improves its red team capabilities.
Client relationship: Follow up with the client after a reasonable remediation window (typically 30-60 days) to check on remediation progress. This demonstrates ongoing commitment to their security posture and can lead to follow-on engagements for verification testing or expanded scope assessments.
References and Further Reading
- CVE-2023-39659 — LangChain arbitrary code execution via json agent
- Perez et al. 2022 — "Red Teaming Language Models with Language Models"
- OWASP LLM Top 10 2025 — LLM09 (Overreliance)
- Wei et al. 2023 — "Jailbroken: How Does LLM Safety Training Fail?"
- JailbreakBench — github.com/JailbreakBench/jailbreakbench — benchmark suite
- Inspect AI (UK AISI) — github.com/UKGovernmentBEIS/inspect_ai — AI safety evaluations
Why should reconnaissance receive 15-20% of total engagement time even in time-boxed assessments?
What is the most important factor when deciding between fail-open and fail-closed for a defense component?