cve-decoder / app.py
MMADS's picture
final cosmetic changes
6b7cbd6
"""CVE Dashboard - Real-time vulnerability monitoring with NVD API and LLM-powered audience customization."""
import os
import json
import time
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Tuple
import gradio as gr
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import requests
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Audience profiles for tailored CVE descriptions
AUDIENCE_PROFILES = {
"Cybersecurity Professional": {
"focus": "threat assessment, attack vectors, mitigation strategies, and security controls",
"tone": "technical and precise",
"priorities": ["exploitation methods", "defensive measures", "risk assessment", "compliance implications"]
},
"Data Scientist": {
"focus": "data exposure risks, model vulnerabilities, and statistical analysis implications",
"tone": "analytical and research-oriented",
"priorities": ["data integrity", "model security", "pipeline vulnerabilities", "privacy concerns"]
},
"Data Engineer": {
"focus": "infrastructure vulnerabilities, data pipeline security, and system architecture impacts",
"tone": "technical with infrastructure emphasis",
"priorities": ["database security", "ETL vulnerabilities", "infrastructure risks", "data flow security"]
},
"Full-Stack Developer": {
"focus": "code vulnerabilities, dependency risks, and implementation fixes",
"tone": "practical and code-oriented",
"priorities": ["code examples", "library updates", "patch implementation", "secure coding practices"]
},
"Product Owner": {
"focus": "business impact, user experience, and prioritization for backlog",
"tone": "business-oriented with technical context",
"priorities": ["user impact", "feature implications", "timeline considerations", "resource requirements"]
},
"Manager": {
"focus": "business risk, resource allocation, and strategic implications",
"tone": "executive summary style",
"priorities": ["business impact", "cost implications", "team requirements", "timeline urgency"]
}
}
class CVEDashboard:
"""Main CVE Dashboard application class."""
def __init__(self):
"""Initialize the CVE Dashboard."""
self.api_key = os.getenv('NVD_API_KEY')
self.base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
self.headers = {'apiKey': self.api_key} if self.api_key else {}
self.cache = {}
self.last_request_time = 0
self.rate_limit_delay = 0.7 if self.api_key else 6 # seconds between requests
# HuggingFace token - try environment first
self.hf_token = os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN')
def _rate_limit(self):
"""Implement rate limiting for NVD API."""
current_time = time.time()
time_since_last = current_time - self.last_request_time
if time_since_last < self.rate_limit_delay:
time.sleep(self.rate_limit_delay - time_since_last)
self.last_request_time = time.time()
def fetch_cves(self,
year: int,
keyword: Optional[str] = None,
severity: Optional[str] = None,
results_per_page: int = 2000) -> Tuple[List[Dict], str]:
"""
Fetch CVEs from NVD API for a specific year, handling the 120-day range limit
and ensuring the date range does not extend into the future.
Args:
year: The year to fetch CVEs for.
keyword: Optional keyword to search
severity: Optional severity filter (LOW, MEDIUM, HIGH, CRITICAL)
results_per_page: Number of results per page (max 2000)
Returns:
Tuple of (list of CVEs, status message)
"""
try:
all_vulnerabilities = []
now = datetime.now()
year_start = datetime(year, 1, 1)
# If the selected year is the current year, end the search today.
# Otherwise, use the end of the selected year.
if year == now.year:
year_end = now
else:
year_end = datetime(year, 12, 31, 23, 59, 59)
current_start = year_start
while current_start < year_end:
self._rate_limit()
# Calculate the end of the chunk, respecting the 120-day limit
chunk_end = min(current_start + timedelta(days=119), year_end)
logger.info(f"Fetching CVEs from {current_start.date()} to {chunk_end.date()}")
# Format dates with timezone information (Z for UTC)
start_date_str = current_start.strftime('%Y-%m-%dT%H:%M:%S.000Z')
end_date_str = chunk_end.strftime('%Y-%m-%dT%H:%M:%S.999Z')
params = {
'pubStartDate': start_date_str,
'pubEndDate': end_date_str,
'resultsPerPage': min(results_per_page, 2000)
}
if keyword:
params['keywordSearch'] = keyword
response = requests.get(
self.base_url,
headers=self.headers,
params=params,
timeout=30
)
# Handle different error scenarios
if response.status_code == 404:
logger.warning(f"No data found for date range {current_start.date()} to {chunk_end.date()}")
# Move to the next chunk and continue
current_start = chunk_end + timedelta(days=1)
continue
elif response.status_code != 200:
response.raise_for_status()
data = response.json()
vulnerabilities = data.get('vulnerabilities', [])
all_vulnerabilities.extend(vulnerabilities)
# Move to the next chunk
current_start = chunk_end + timedelta(days=1)
# Process and filter all aggregated CVEs
processed_cves = []
for vuln in all_vulnerabilities:
cve = self._process_cve(vuln.get('cve', {}))
if severity and cve['severity'] != severity:
continue
processed_cves.append(cve)
if not processed_cves:
return [], f"No CVEs found for year {year}" + (f" matching '{keyword}'" if keyword else "") + (f" with {severity} severity" if severity else "")
status = f"βœ“ Fetched {len(processed_cves)} CVEs from the year {year}"
if keyword:
status += f" matching '{keyword}'"
if severity:
status += f" with {severity} severity"
return processed_cves, status
except requests.exceptions.RequestException as e:
error_details = ""
if e.response is not None:
try:
error_data = e.response.json()
error_details = f" - {error_data.get('message', e.response.text)}"
except json.JSONDecodeError:
error_details = f" - Status: {e.response.status_code}, Response: {e.response.text[:200]}"
return [], f"βœ— API Error: {str(e)}{error_details}"
except Exception as e:
return [], f"βœ— Error: {str(e)}"
def _process_cve(self, cve_data: Dict) -> Dict:
"""Process raw CVE data into a structured format."""
cve_id = cve_data.get('id', 'Unknown')
# Extract description
descriptions = cve_data.get('descriptions', [])
description = next(
(d['value'] for d in descriptions if d.get('lang') == 'en'),
'No description available'
)
# Extract CVSS metrics and severity
metrics = cve_data.get('metrics', {})
cvss_data = {}
severity = 'UNKNOWN'
score = 0.0
# Try CVSS 3.1 first, then 3.0, then 2.0
for cvss_version in ['cvssMetricV31', 'cvssMetricV30', 'cvssMetricV2']:
if cvss_version in metrics and metrics[cvss_version]:
metric = metrics[cvss_version][0]
cvss_data = metric.get('cvssData', {})
score = cvss_data.get('baseScore', 0.0)
severity = cvss_data.get('baseSeverity', 'UNKNOWN')
break
# Extract references
references = cve_data.get('references', [])
ref_urls = [ref.get('url', '') for ref in references[:5]] # Limit to 5 refs
# Extract dates
published = cve_data.get('published', '')
modified = cve_data.get('lastModified', '')
return {
'id': cve_id,
'description': description, # Keep full description for LLM processing
'display_description': description[:500] + '...' if len(description) > 500 else description,
'severity': severity,
'score': score,
'published': published[:10] if published else 'Unknown',
'modified': modified[:10] if modified else 'Unknown',
'references': ref_urls,
'cvss_version': cvss_data.get('version', 'Unknown'),
'vector_string': cvss_data.get('vectorString', 'N/A')
}
def create_severity_chart(self, cves: List[Dict]) -> go.Figure:
"""Create a pie chart of CVE severities."""
if not cves:
fig = go.Figure()
fig.add_annotation(text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False)
return fig
severity_counts = pd.DataFrame(cves)['severity'].value_counts()
colors = {
'CRITICAL': '#d32f2f',
'HIGH': '#f57c00',
'MEDIUM': '#fbc02d',
'LOW': '#388e3c',
'UNKNOWN': '#9e9e9e'
}
fig = px.pie(
values=severity_counts.values,
names=severity_counts.index,
title="CVE Distribution by Severity",
color=severity_counts.index,
color_discrete_map=colors
)
fig.update_traces(textposition='inside', textinfo='percent+label')
fig.update_layout(height=400)
return fig
def create_timeline_chart(self, cves: List[Dict]) -> go.Figure:
"""Create a timeline chart of CVE publications."""
if not cves:
fig = go.Figure()
fig.add_annotation(text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False)
return fig
df = pd.DataFrame(cves)
df['published'] = pd.to_datetime(df['published'])
# Group by date and severity
timeline_data = df.groupby([df['published'].dt.date, 'severity']).size().reset_index(name='count')
fig = px.bar(
timeline_data,
x='published',
y='count',
color='severity',
title="CVE Publications Timeline",
color_discrete_map={
'CRITICAL': '#d32f2f',
'HIGH': '#f57c00',
'MEDIUM': '#fbc02d',
'LOW': '#388e3c',
'UNKNOWN': '#9e9e9e'
}
)
fig.update_layout(
xaxis_title="Publication Date",
yaxis_title="Number of CVEs",
height=400,
hovermode='x unified'
)
return fig
def create_score_distribution(self, cves: List[Dict]) -> go.Figure:
"""Create a histogram of CVSS scores."""
if not cves:
fig = go.Figure()
fig.add_annotation(text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False)
return fig
scores = [cve['score'] for cve in cves if cve['score'] > 0]
fig = go.Figure(data=[go.Histogram(
x=scores,
nbinsx=20,
marker_color='#1976d2'
)])
fig.update_layout(
title="CVSS Score Distribution",
xaxis_title="CVSS Score",
yaxis_title="Count",
height=400,
showlegend=False
)
# Add severity range annotations
fig.add_vrect(x0=0, x1=3.9, fillcolor="green", opacity=0.1, annotation_text="Low")
fig.add_vrect(x0=4, x1=6.9, fillcolor="yellow", opacity=0.1, annotation_text="Medium")
fig.add_vrect(x0=7, x1=8.9, fillcolor="orange", opacity=0.1, annotation_text="High")
fig.add_vrect(x0=9, x1=10, fillcolor="red", opacity=0.1, annotation_text="Critical")
return fig
def format_cve_table(self, cves: List[Dict]) -> pd.DataFrame:
"""Format CVEs for display in a table."""
if not cves:
return pd.DataFrame()
df = pd.DataFrame(cves)
# Select and reorder columns
columns = ['id', 'severity', 'score', 'published', 'display_description']
df = df[columns]
# Format the dataframe
df = df.rename(columns={
'id': 'CVE ID',
'severity': 'Severity',
'score': 'CVSS Score',
'published': 'Published',
'display_description': 'Description'
})
return df
def generate_tailored_summary(cve_description: str, audience: str, hf_token: Optional[str] = None, max_retries: int = 2) -> str:
"""
Generates a tailored CVE summary using google/gemma-2-2b-it via HuggingFace Inference API.
Args:
cve_description: The original CVE description
audience: Target audience from AUDIENCE_PROFILES
hf_token: HuggingFace API token (optional if set as env var)
max_retries: Maximum number of retry attempts
Returns:
Tailored summary or error message
"""
# Use provided token or fall back to environment variable
token = hf_token or os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN')
if not token:
return "❌ HuggingFace API token is required. Please set HF_TOKEN environment variable or enter your token."
if not cve_description or not audience:
return "❌ Please select a CVE and an audience first."
if audience not in AUDIENCE_PROFILES:
return f"❌ Unknown audience: {audience}"
# Define the model(s) to use
models = [
"google/gemma-2-2b-it",
]
headers = {"Authorization": f"Bearer {token}"}
profile = AUDIENCE_PROFILES[audience]
# Gemma uses a specific chat template format.
# Combine the system and user prompts into a single user turn.
full_prompt = f"""You are an expert cybersecurity analyst.
Rewrite this CVE description for a {audience}.
**Target Audience:** {audience}
**Focus:** {profile['focus']}
**Tone:** {profile['tone']}
**Key Priorities:** {', '.join(profile['priorities'])}
**CVE Description:**
{cve_description[:1200]}
Provide a concise, actionable summary (2-3 sentences) highlighting what matters most to this audience. Focus on practical implications and next steps."""
# Use the OpenAI-compatible messages format
messages = [
{"role": "user", "content": full_prompt}
]
# Use the new, standardized router endpoint
api_url = "https://router.huggingface.co/v1/chat/completions"
for model in models:
payload = {
"model": model,
"messages": messages,
"max_tokens": 250,
"temperature": 0.7,
"top_p": 0.95,
"stop": ["<end_of_turn>", "<start_of_turn>"] # Stop sequences for Gemma
}
for attempt in range(max_retries):
try:
logger.info(f"Generating summary with {model} (attempt {attempt + 1})")
response = requests.post(api_url, headers=headers, json=payload, timeout=45)
if response.status_code == 200:
try:
result = response.json()
# New OpenAI-compatible response parsing
summary = ""
if "choices" in result and len(result["choices"]) > 0:
message = result["choices"][0].get("message", {})
summary = message.get("content", "").strip()
if summary and len(summary) > 20:
logger.info(f"Successfully generated summary with {model}")
return f"**{audience} Summary (via {model.split('/')[-1]}):**\n{summary}"
else:
# Handle cases where the model returns an empty summary
logger.warning(f"Model {model} returned an empty or short summary.")
continue # Retry if possible
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error with {model}: {e}")
continue
elif response.status_code == 503:
logger.warning(f"Model {model} is loading, trying next model...")
break # Try next model
elif response.status_code == 429:
if attempt < max_retries - 1:
time.sleep(5)
continue
else:
break
else:
error_message = response.json().get("error", response.text)
logger.warning(f"HTTP {response.status_code} with {model}: {error_message}")
# If the model is not found or there's a validation error, don't retry.
if response.status_code in [404, 422]:
return f"❌ Model '{model}' not found or request is invalid. Please check the model name."
break
except requests.exceptions.Timeout:
logger.warning(f"Timeout with {model} on attempt {attempt + 1}")
if attempt >= max_retries - 1:
break # Break outer loop if all retries failed
except requests.exceptions.RequestException as e:
logger.error(f"Request failed with {model}: {e}")
break
return "⏳ AI models are currently busy. This can happen during peak usage. Please try again in a few minutes."
def create_interface():
"""Create the Gradio interface."""
dashboard = CVEDashboard()
with gr.Blocks(title="CVE Dashboard", theme=gr.themes.Soft()) as interface:
# State to store fetched CVEs
cve_state = gr.State([])
gr.Markdown(
"""
# πŸ›‘οΈ CVE Dashboard with AI-Powered Audience Customization
Real-time vulnerability monitoring using NIST National Vulnerability Database (NVD) with LLM-powered audience-specific summaries
"""
)
with gr.Row():
with gr.Column(scale=1):
hf_token = gr.State(dashboard.hf_token)
gr.Markdown("### πŸ” Search Parameters")
current_year = datetime.now().year
# Default to previous year to ensure we have data
default_year = current_year - 1 if current_year == 2025 else current_year
year_filter = gr.Dropdown(
choices=list(range(current_year, current_year - 10, -1)),
value=default_year,
label="Year"
)
keyword = gr.Textbox(
label="Keyword Search (Optional)",
placeholder="e.g., Apache, Linux, Microsoft"
)
severity_filter = gr.Dropdown(
choices=[None, "CRITICAL", "HIGH", "MEDIUM", "LOW"],
label="Severity Filter",
value=None
)
fetch_btn = gr.Button("πŸ” Fetch CVEs", variant="primary")
with gr.Column(scale=3):
status_text = gr.Textbox(label="Status", interactive=False)
with gr.Tabs():
with gr.Tab("πŸ“Š Overview"):
with gr.Row():
severity_chart = gr.Plot(label="Severity Distribution")
timeline_chart = gr.Plot(label="Timeline")
score_chart = gr.Plot(label="CVSS Score Distribution")
with gr.Tab("πŸ“‹ CVE List"):
cve_table = gr.DataFrame(
label="CVE Details",
wrap=True,
row_count=15
)
with gr.Tab("πŸ€– AI-Powered Summaries"):
gr.Markdown("### Generate Audience-Specific CVE Summaries")
with gr.Row():
with gr.Column():
cve_selector = gr.Dropdown(
label="Select CVE",
choices=[],
info="Choose a CVE from the fetched results"
)
audience_selector = gr.Dropdown(
label="Target Audience",
choices=list(AUDIENCE_PROFILES.keys()),
value="Cybersecurity Professional",
info="Select the professional perspective"
)
generate_btn = gr.Button("🧠 Generate AI Summary", variant="primary")
# Add status for generation
generation_status = gr.Textbox(
label="Generation Status",
value="Ready to generate summaries",
interactive=False
)
with gr.Column():
audience_info = gr.Markdown(
value="**Focus:** threat assessment, attack vectors, mitigation strategies, and security controls\n\n**Priorities:** exploitation methods, defensive measures, risk assessment, compliance implications"
)
original_description = gr.Textbox(
label="Original CVE Description",
lines=4,
interactive=False
)
tailored_summary = gr.Textbox(
label="AI-Generated Summary",
lines=6,
interactive=False,
placeholder="Select a CVE and audience, then click 'Generate AI Summary'"
)
with gr.Tab("ℹ️ About"):
gr.Markdown(
"""
### About this Dashboard
This dashboard provides real-time monitoring of [Common Vulnerabilities and Exposures (CVEs)](https://en.wikipedia.org/wiki/Common_Vulnerabilities_and_Exposures)
from the NIST National Vulnerability Database with AI-powered audience customization.
**Features:**
- Search CVEs by date range and keywords
- Filter by severity levels
- Visualize CVE distributions and trends
- AI-powered audience-specific summaries using the google/gemma-2-2b-it model.
**Severity Levels:**
- **CRITICAL** (9.0-10.0): Complete system compromise possible
- **HIGH** (7.0-8.9): Significant impact, immediate patching recommended
- **MEDIUM** (4.0-6.9): Moderate impact, plan for updates
- **LOW** (0.1-3.9): Minor impact, update in regular cycle
**Supported Audiences:**
- **Cybersecurity Professional:** Focus on threats, attack vectors, and mitigation
- **Data Scientist:** Emphasis on data risks and model vulnerabilities
- **Data Engineer:** Infrastructure security and pipeline risks
- **Full-Stack Developer:** Code vulnerabilities and implementation fixes
- **Product Owner:** Business impact and prioritization guidance
- **Manager:** Executive summary with business implications
**Data Source:** [NIST NVD API](https://nvd.nist.gov/developers/vulnerabilities)
**AI Model:** [google/gemma-2-2b-it](https://huggingface.co/google/gemma-2-2b-it)
**Disclaimer:** Generated content may be inaccurate or false.
The free community tier of the Hugging Face Inference API powers this app's AI features.
Since computing resources are shared, anticipate some delay on your initial request as the model loads. Later requests usually process more quickly.
**Note:** If you encounter rate limiting or timeouts, please try again after a short wait.
**Developed by** [M. Murat Ardag](https://mmuratardag.github.io/).
"""
)
# Event handlers
def fetch_and_display(year, keyword_search, severity):
"""Fetch CVEs and update all displays."""
cves, status = dashboard.fetch_cves(
year=year,
keyword=keyword_search if keyword_search else None,
severity=severity if severity else None
)
if cves:
df = dashboard.format_cve_table(cves)
severity_fig = dashboard.create_severity_chart(cves)
timeline_fig = dashboard.create_timeline_chart(cves)
score_fig = dashboard.create_score_distribution(cves)
# Update CVE selector choices
cve_choices = [f"{cve['id']} ({cve['severity']}, {cve['score']})" for cve in cves]
return (
cves, # Update state
status,
df,
severity_fig,
timeline_fig,
score_fig,
gr.Dropdown(choices=cve_choices, value=cve_choices[0] if cve_choices else None) # Update CVE selector
)
else:
empty_fig = go.Figure()
empty_fig.add_annotation(
text="No data available",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False
)
return (
[], # Update state
status,
pd.DataFrame(),
empty_fig,
empty_fig,
empty_fig,
gr.Dropdown(choices=[], value=None) # Clear CVE selector
)
def update_audience_info(audience):
"""Update audience information display."""
if audience in AUDIENCE_PROFILES:
profile = AUDIENCE_PROFILES[audience]
info = f"**Focus:** {profile['focus']}\n\n**Priorities:** {', '.join(profile['priorities'])}"
return info
return "Select an audience to see details"
def update_cve_description(selected_cve, cves):
"""Update the original CVE description when a CVE is selected."""
if not selected_cve or not cves:
return ""
# Extract CVE ID from the selection (format: "CVE-2024-1234 (HIGH, 7.5)")
cve_id = selected_cve.split(" (")[0]
# Find the matching CVE
for cve in cves:
if cve['id'] == cve_id:
return cve['description']
return "CVE description not found"
def generate_summary_with_status(selected_cve, audience, token, cves):
"""Generate audience-specific summary with status updates."""
if not selected_cve or not audience or not cves:
return "Please select a CVE and audience first.", "❌ Missing selection"
# Extract CVE ID from the selection
cve_id = selected_cve.split(" (")[0]
# Find the matching CVE
for cve in cves:
if cve['id'] == cve_id:
# Update status to show generation in progress
yield "Generating AI summary... This may take 30-60 seconds.", "πŸ”„ Generating..."
summary = generate_tailored_summary(cve['description'], audience, token)
if summary.startswith("❌"):
yield summary, "❌ Generation failed"
elif summary.startswith("⏳"):
yield summary, "⏳ Models busy"
else:
yield summary, "βœ… Summary generated"
return
yield "CVE not found", "❌ CVE not found"
# Wire up the event handlers
fetch_btn.click(
fn=fetch_and_display,
inputs=[year_filter, keyword, severity_filter],
outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector]
)
audience_selector.change(
fn=update_audience_info,
inputs=[audience_selector],
outputs=[audience_info]
)
cve_selector.change(
fn=update_cve_description,
inputs=[cve_selector, cve_state],
outputs=[original_description]
)
generate_btn.click(
fn=generate_summary_with_status,
inputs=[cve_selector, audience_selector, hf_token, cve_state],
outputs=[tailored_summary, generation_status]
)
# Load initial data
interface.load(
fn=fetch_and_display,
inputs=[year_filter, keyword, severity_filter],
outputs=[cve_state, status_text, cve_table, severity_chart, timeline_chart, score_chart, cve_selector]
)
return interface
if __name__ == "__main__":
# Check for API keys
if os.getenv('NVD_API_KEY'):
print("βœ“ NVD API key loaded - Higher rate limits enabled")
else:
print("⚠ No NVD API key found - Using lower rate limits")
print(" Get a free API key at: https://nvd.nist.gov/developers/request-an-api-key")
if os.getenv('HF_TOKEN') or os.getenv('HUGGINGFACE_TOKEN'):
print("βœ“ HuggingFace token loaded - AI summaries enabled")
else:
print("⚠ No HuggingFace token found - Users will need to enter their own")
print(" Get a free token at: https://huggingface.co/settings/tokens")
# Create and launch the interface
app = create_interface()
app.launch()